http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java deleted file mode 100644 index e15c4e9..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.fault; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.kafka.common.utils.Utils; - - -/** - * The specification for a fault. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, - include = JsonTypeInfo.As.PROPERTY, - property = "class") -public interface FaultSpec { - class Util { - private static final String SPEC_STRING = "Spec"; - - public static Fault createFault(String faultId, FaultSpec faultSpec) throws ClassNotFoundException { - String faultSpecClassName = faultSpec.getClass().getName(); - if (!faultSpecClassName.endsWith(SPEC_STRING)) { - throw new RuntimeException("FaultSpec class name must end with " + SPEC_STRING); - } - String faultClassName = faultSpecClassName.substring(0, - faultSpecClassName.length() - SPEC_STRING.length()); - return Utils.newParameterizedInstance(faultClassName, - String.class, faultId, - FaultSpec.class, faultSpec); - } - } - - /** - * Get the start time of this fault in ms. - */ - @JsonProperty - long startMs(); - - /** - * Get the duration of this fault in ms. - */ - @JsonProperty - long durationMs(); -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java deleted file mode 100644 index cba8419..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.fault; - -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.kafka.trogdor.common.JsonUtil; -import java.util.Objects; - -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.PROPERTY, - property = "stateName") -@JsonSubTypes({ - @JsonSubTypes.Type(value = DoneState.class, name = "done"), - @JsonSubTypes.Type(value = PendingState.class, name = "pending"), - @JsonSubTypes.Type(value = RunningState.class, name = "running"), - @JsonSubTypes.Type(value = SendingState.class, name = "sending") - }) -public abstract class FaultState { - @Override - public final boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - return toString().equals(o.toString()); - } - - @Override - public final int hashCode() { - return Objects.hashCode(toString()); - } - - @Override - public final String toString() { - return JsonUtil.toJsonString(this); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java deleted file mode 100644 index cf3270a..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.fault; - -import org.apache.kafka.trogdor.common.Node; -import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.Topology; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -public class NetworkPartitionFault extends AbstractFault { - private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class); - - private final List<Set<String>> partitions; - - public NetworkPartitionFault(String id, FaultSpec spec) { - super(id, spec); - NetworkPartitionFaultSpec faultSpec = (NetworkPartitionFaultSpec) spec; - this.partitions = new ArrayList<>(); - HashSet<String> prevNodes = new HashSet<>(); - for (List<String> partition : faultSpec.partitions()) { - for (String nodeName : partition) { - if (prevNodes.contains(nodeName)) { - throw new RuntimeException("Node " + nodeName + - " appears in more than one partition."); - } - prevNodes.add(nodeName); - this.partitions.add(new HashSet<String>(partition)); - } - } - } - - @Override - protected void handleActivation(long now, Platform platform) throws Exception { - log.info("Activating NetworkPartitionFault..."); - runIptablesCommands(platform, "-A"); - } - - @Override - protected void handleDeactivation(long now, Platform platform) throws Exception { - log.info("Deactivating NetworkPartitionFault..."); - runIptablesCommands(platform, "-D"); - } - - private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception { - Node curNode = platform.curNode(); - Topology topology = platform.topology(); - TreeSet<String> toBlock = new TreeSet<>(); - for (Set<String> partition : partitions) { - if (!partition.contains(curNode.name())) { - for (String nodeName : partition) { - toBlock.add(nodeName); - } - } - } - for (String nodeName : toBlock) { - Node node = topology.node(nodeName); - InetAddress addr = InetAddress.getByName(node.hostname()); - platform.runCommand(new String[] { - "sudo", "iptables", iptablesAction, "INPUT", "-p", "tcp", "-s", - addr.getHostAddress(), "-j", "DROP", "-m", "comment", "--comment", nodeName - }); - } - } - - @Override - public Set<String> targetNodes(Topology topology) { - Set<String> targetNodes = new HashSet<>(); - for (Set<String> partition : partitions) { - targetNodes.addAll(partition); - } - return targetNodes; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java new file mode 100644 index 0000000..d90534f --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import org.apache.kafka.trogdor.common.Topology; +import org.apache.kafka.trogdor.task.TaskController; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class NetworkPartitionFaultController implements TaskController { + private final List<Set<String>> partitionSets; + + public NetworkPartitionFaultController(List<Set<String>> partitionSets) { + this.partitionSets = partitionSets; + } + + @Override + public Set<String> targetNodes(Topology topology) { + Set<String> targetNodes = new HashSet<>(); + for (Set<String> partitionSet : partitionSets) { + targetNodes.addAll(partitionSet); + } + return targetNodes; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java index d734dce..7b9ccc4 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java @@ -19,15 +19,19 @@ package org.apache.kafka.trogdor.fault; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.task.TaskController; +import org.apache.kafka.trogdor.task.TaskSpec; +import org.apache.kafka.trogdor.task.TaskWorker; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; -import java.util.Objects; +import java.util.Set; /** * The specification for a fault that creates a network partition. */ -public class NetworkPartitionFaultSpec extends AbstractFaultSpec { +public class NetworkPartitionFaultSpec extends TaskSpec { private final List<List<String>> partitions; @JsonCreator @@ -44,22 +48,28 @@ public class NetworkPartitionFaultSpec extends AbstractFaultSpec { } @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NetworkPartitionFaultSpec that = (NetworkPartitionFaultSpec) o; - return Objects.equals(startMs(), that.startMs()) && - Objects.equals(durationMs(), that.durationMs()) && - Objects.equals(partitions, that.partitions); + public TaskController newController(String id) { + return new NetworkPartitionFaultController(partitionSets()); } @Override - public int hashCode() { - return Objects.hash(startMs(), durationMs(), partitions); + public TaskWorker newTaskWorker(String id) { + return new NetworkPartitionFaultWorker(id, partitionSets()); } - @Override - public String toString() { - return JsonUtil.toJsonString(this); + private List<Set<String>> partitionSets() { + List<Set<String>> partitionSets = new ArrayList<>(); + HashSet<String> prevNodes = new HashSet<>(); + for (List<String> partition : this.partitions()) { + for (String nodeName : partition) { + if (prevNodes.contains(nodeName)) { + throw new RuntimeException("Node " + nodeName + + " appears in more than one partition."); + } + prevNodes.add(nodeName); + partitionSets.add(new HashSet<>(partition)); + } + } + return partitionSets; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java new file mode 100644 index 0000000..787c5e0 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.trogdor.common.Node; +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.Topology; +import org.apache.kafka.trogdor.task.TaskWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; + +public class NetworkPartitionFaultWorker implements TaskWorker { + private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFaultWorker.class); + + private final String id; + + private final List<Set<String>> partitionSets; + + public NetworkPartitionFaultWorker(String id, List<Set<String>> partitionSets) { + this.id = id; + this.partitionSets = partitionSets; + } + + @Override + public void start(Platform platform, AtomicReference<String> status, + KafkaFutureImpl<String> errorFuture) throws Exception { + log.info("Activating NetworkPartitionFault {}.", id); + runIptablesCommands(platform, "-A"); + } + + @Override + public void stop(Platform platform) throws Exception { + log.info("Deactivating NetworkPartitionFault {}.", id); + runIptablesCommands(platform, "-D"); + } + + private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception { + Node curNode = platform.curNode(); + Topology topology = platform.topology(); + TreeSet<String> toBlock = new TreeSet<>(); + for (Set<String> partitionSet : partitionSets) { + if (!partitionSet.contains(curNode.name())) { + for (String nodeName : partitionSet) { + toBlock.add(nodeName); + } + } + } + for (String nodeName : toBlock) { + Node node = topology.node(nodeName); + InetAddress addr = InetAddress.getByName(node.hostname()); + platform.runCommand(new String[] { + "sudo", "iptables", iptablesAction, "INPUT", "-p", "tcp", "-s", + addr.getHostAddress(), "-j", "DROP", "-m", "comment", "--comment", nodeName + }); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java deleted file mode 100644 index 70b4965..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.fault; - -import org.apache.kafka.trogdor.common.Node; -import org.apache.kafka.trogdor.common.Platform; -import org.apache.kafka.trogdor.common.Topology; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -public class NoOpFault extends AbstractFault { - private static final Logger log = LoggerFactory.getLogger(NoOpFault.class); - - public NoOpFault(String id, FaultSpec spec) { - super(id, spec); - } - - @Override - protected void handleActivation(long now, Platform platform) throws Exception { - log.info("Activating NoOpFault..."); - } - - @Override - protected void handleDeactivation(long now, Platform platform) throws Exception { - log.info("Deactivating NoOpFault..."); - } - - @Override - public Set<String> targetNodes(Topology topology) { - Set<String> set = new HashSet<>(); - for (Map.Entry<String, Node> entry : topology.nodes().entrySet()) { - if (Node.Util.getTrogdorAgentPort(entry.getValue()) > 0) { - set.add(entry.getKey()); - } - } - return set; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java deleted file mode 100644 index 1d4b94d..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.fault; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Objects; - -/** - * The specification for a fault that does nothing. - * - * This fault type exists mainly to test the fault injection system. - */ -public class NoOpFaultSpec extends AbstractFaultSpec { - @JsonCreator - public NoOpFaultSpec(@JsonProperty("startMs") long startMs, - @JsonProperty("durationMs") long durationMs) { - super(startMs, durationMs); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NoOpFaultSpec that = (NoOpFaultSpec) o; - return Objects.equals(startMs(), that.startMs()) && - Objects.equals(durationMs(), that.durationMs()); - } - - @Override - public int hashCode() { - return Objects.hash(startMs(), durationMs()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java deleted file mode 100644 index 57c8e88..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.fault; - -import com.fasterxml.jackson.annotation.JsonCreator; - -/** - * The state a fault is in on the agent or controller when we haven't yet done - * anything with it. - */ -public class PendingState extends FaultState { - @JsonCreator - public PendingState() { - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java deleted file mode 100644 index 1b81bf5..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.fault; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * The state a fault is in on the agent when it is running. - */ -public class RunningState extends FaultState { - private final long startedMs; - - @JsonCreator - public RunningState(@JsonProperty("startedMs") long startedMs) { - this.startedMs = startedMs; - } - - @JsonProperty - public long startedMs() { - return startedMs; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java deleted file mode 100644 index edfbed2..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.fault; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.common.utils.Utils; - -import java.util.TreeMap; -import java.util.Set; - -/** - * The state a fault is in on the controller when it is scheduled to be sent to several agents. - */ -public class SendingState extends FaultState { - private final TreeMap<String, Boolean> nodes; - private int remainingNodes; - - public SendingState(@JsonProperty("nodeNames") Set<String> nodeNames) { - this.nodes = new TreeMap<>(); - for (String nodeName : nodeNames) { - nodes.put(nodeName, false); - } - remainingNodes = nodeNames.size(); - } - - @JsonProperty - public synchronized Set<String> nodeNames() { - return nodes.keySet(); - } - - /** - * Complete a send operation. - * - * @param nodeName The name of the node we sent to. - * @return True if there are no more send operations left. - */ - public synchronized boolean completeSend(String nodeName) { - if (!nodes.containsKey(nodeName)) { - throw new RuntimeException("Node " + nodeName + " was not to supposed to " + - "receive this fault. The fault was scheduled on nodes: " + - Utils.join(nodes.keySet(), ", ")); - } - if (nodes.put(nodeName, true)) { - throw new RuntimeException("Node " + nodeName + " already received this fault."); - } - remainingNodes--; - return remainingNodes == 0; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java deleted file mode 100644 index a1b5246..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.rest; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.common.JsonUtil; - -import java.util.Map; - -/** - * Response to GET /faults - */ -public class AgentFaultsResponse extends FaultDataMap { - @JsonCreator - public AgentFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) { - super(faults); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - AgentFaultsResponse that = (AgentFaultsResponse) o; - return super.equals(that); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - - @Override - public String toString() { - return JsonUtil.toJsonString(this); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java index 8e32f87..77b4bfb 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java @@ -19,41 +19,30 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.common.JsonUtil; -import java.util.Objects; +import java.util.TreeMap; /** - * The status of the Trogdor agent. + * A response from the Trogdor agent about the worker states and specifications. */ -public class AgentStatusResponse { - private final long startTimeMs; +public class AgentStatusResponse extends Message { + private final long serverStartMs; + private final TreeMap<String, WorkerState> workers; @JsonCreator - public AgentStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) { - this.startTimeMs = startTimeMs; + public AgentStatusResponse(@JsonProperty("serverStartMs") long serverStartMs, + @JsonProperty("workers") TreeMap<String, WorkerState> workers) { + this.serverStartMs = serverStartMs; + this.workers = workers; } @JsonProperty - public long startTimeMs() { - return startTimeMs; + public long serverStartMs() { + return serverStartMs; } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - AgentStatusResponse that = (AgentStatusResponse) o; - return Objects.equals(startTimeMs, that.startTimeMs); - } - - @Override - public int hashCode() { - return Objects.hash(startTimeMs); - } - - @Override - public String toString() { - return JsonUtil.toJsonString(this); + @JsonProperty + public TreeMap<String, WorkerState> workers() { + return workers; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java deleted file mode 100644 index df26274..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.rest; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.common.JsonUtil; - -import java.util.Map; - -/** - * Response to GET /faults - */ -public class CoordinatorFaultsResponse extends FaultDataMap { - @JsonCreator - public CoordinatorFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) { - super(faults); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CoordinatorFaultsResponse that = (CoordinatorFaultsResponse) o; - return super.equals(that); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - - @Override - public String toString() { - return JsonUtil.toJsonString(this); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java new file mode 100644 index 0000000..1aacaaf --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A request to the Trogdor coordinator to shut down. + */ +public class CoordinatorShutdownRequest extends Message { + private final boolean stopAgents; + + @JsonCreator + public CoordinatorShutdownRequest(@JsonProperty("stopAgents") boolean stopAgents) { + this.stopAgents = stopAgents; + } + + @JsonProperty + public boolean stopAgents() { + return stopAgents; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java index 348e310..8840d29 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java @@ -19,41 +19,20 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.common.JsonUtil; - -import java.util.Objects; /** - * The status of the Trogdor coordinator. + * A status response from the Trogdor coordinator. */ -public class CoordinatorStatusResponse { - private final long startTimeMs; +public class CoordinatorStatusResponse extends Message { + private final long serverStartMs; @JsonCreator - public CoordinatorStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) { - this.startTimeMs = startTimeMs; + public CoordinatorStatusResponse(@JsonProperty("serverStartMs") long serverStartMs) { + this.serverStartMs = serverStartMs; } @JsonProperty - public long startTimeMs() { - return startTimeMs; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CoordinatorStatusResponse that = (CoordinatorStatusResponse) o; - return Objects.equals(startTimeMs, that.startTimeMs); - } - - @Override - public int hashCode() { - return Objects.hash(startTimeMs); - } - - @Override - public String toString() { - return JsonUtil.toJsonString(this); + public long serverStartMs() { + return serverStartMs; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java deleted file mode 100644 index 6e772d9..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.rest; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.common.JsonUtil; -import org.apache.kafka.trogdor.fault.FaultSpec; - -import java.util.Objects; - -/** - * A request to the Trogdor agent to create a fault. - */ -public class CreateAgentFaultRequest { - private final String id; - private final FaultSpec spec; - - @JsonCreator - public CreateAgentFaultRequest(@JsonProperty("id") String id, - @JsonProperty("spec") FaultSpec spec) { - this.id = id; - this.spec = spec; - } - - @JsonProperty - public String id() { - return id; - } - - @JsonProperty - public FaultSpec spec() { - return spec; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CreateAgentFaultRequest that = (CreateAgentFaultRequest) o; - return Objects.equals(id, that.id) && - Objects.equals(spec, that.spec); - } - - @Override - public int hashCode() { - return Objects.hash(id, spec); - } - - @Override - public String toString() { - return JsonUtil.toJsonString(this); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java deleted file mode 100644 index ec00cf3..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.rest; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.common.JsonUtil; -import org.apache.kafka.trogdor.fault.FaultSpec; - -import java.util.Objects; - -/** - * A request to the Trogdor coordinator to create a fault. - */ -public class CreateCoordinatorFaultRequest { - private final String id; - private final FaultSpec spec; - - @JsonCreator - public CreateCoordinatorFaultRequest(@JsonProperty("id") String id, - @JsonProperty("spec") FaultSpec spec) { - this.id = id; - this.spec = spec; - } - - @JsonProperty - public String id() { - return id; - } - - @JsonProperty - public FaultSpec spec() { - return spec; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CreateCoordinatorFaultRequest that = (CreateCoordinatorFaultRequest) o; - return Objects.equals(id, that.id) && - Objects.equals(spec, that.spec); - } - - @Override - public int hashCode() { - return Objects.hash(id, spec); - } - - @Override - public String toString() { - return JsonUtil.toJsonString(this); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java new file mode 100644 index 0000000..d463e36 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * A request to the Trogdor coorinator to create a task. + */ +public class CreateTaskRequest extends Message { + private final String id; + private final TaskSpec spec; + + @JsonCreator + public CreateTaskRequest(@JsonProperty("id") String id, + @JsonProperty("spec") TaskSpec spec) { + this.id = id; + this.spec = spec; + } + + @JsonProperty + public String id() { + return id; + } + + @JsonProperty + public TaskSpec spec() { + return spec; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java new file mode 100644 index 0000000..54ea0f2 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * A response from the Trogdor coordinator about creating a task. + */ +public class CreateTaskResponse extends Message { + private final TaskSpec spec; + + @JsonCreator + public CreateTaskResponse(@JsonProperty("spec") TaskSpec spec) { + this.spec = spec; + } + + @JsonProperty + public TaskSpec spec() { + return spec; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java new file mode 100644 index 0000000..9f6e8dc --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * A request to the Trogdor agent to create a worker. + */ +public class CreateWorkerRequest extends Message { + private final String id; + private final TaskSpec spec; + + @JsonCreator + public CreateWorkerRequest(@JsonProperty("id") String id, + @JsonProperty("spec") TaskSpec spec) { + this.id = id; + this.spec = spec; + } + + @JsonProperty + public String id() { + return id; + } + + @JsonProperty + public TaskSpec spec() { + return spec; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java new file mode 100644 index 0000000..9e068ec --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * A response from the Trogdor agent about creating a worker. + */ +public class CreateWorkerResponse extends Message { + private final TaskSpec spec; + + @JsonCreator + public CreateWorkerResponse(@JsonProperty("spec") TaskSpec spec) { + this.spec = spec; + } + + @JsonProperty + public TaskSpec spec() { + return spec; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java deleted file mode 100644 index b2f7c91..0000000 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.trogdor.rest; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.common.JsonUtil; -import org.apache.kafka.trogdor.fault.FaultSpec; -import org.apache.kafka.trogdor.fault.FaultState; - -import java.util.Map; -import java.util.Objects; - -/** - * Response to GET /faults - */ -public class FaultDataMap { - private final Map<String, FaultData> faults; - - public static class FaultData { - private final FaultSpec spec; - private final FaultState state; - - @JsonCreator - public FaultData(@JsonProperty("spec") FaultSpec spec, - @JsonProperty("state") FaultState state) { - this.spec = spec; - this.state = state; - } - - @JsonProperty - public FaultSpec spec() { - return spec; - } - - @JsonProperty - public FaultState state() { - return state; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FaultData that = (FaultData) o; - return Objects.equals(spec, that.spec) && - Objects.equals(state, that.state); - } - - @Override - public int hashCode() { - return Objects.hash(spec, state); - } - } - - @JsonCreator - public FaultDataMap(@JsonProperty("faults") Map<String, FaultData> faults) { - this.faults = faults; - } - - @JsonProperty - public Map<String, FaultData> faults() { - return faults; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FaultDataMap that = (FaultDataMap) o; - return Objects.equals(faults, that.faults); - } - - @Override - public int hashCode() { - return Objects.hashCode(faults); - } - - @Override - public String toString() { - return JsonUtil.toJsonString(this); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java index 1b23a9e..e61b7fe 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.common.ThreadUtils; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; @@ -43,6 +44,10 @@ import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Embedded server for the REST API that provides the control plane for Trogdor. @@ -50,7 +55,9 @@ import java.nio.charset.StandardCharsets; public class JsonRestServer { private static final Logger log = LoggerFactory.getLogger(JsonRestServer.class); - private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 2 * 1000; + private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 100; + + private final ScheduledExecutorService shutdownExecutor; private final Server jettyServer; @@ -63,6 +70,8 @@ public class JsonRestServer { * 0 to use a random port. */ public JsonRestServer(int port) { + this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("JsonRestServerCleanupExecutor", false)); this.jettyServer = new Server(); this.connector = new ServerConnector(jettyServer); if (port > 0) { @@ -78,7 +87,6 @@ public class JsonRestServer { */ public void start(Object... resources) { log.info("Starting REST server"); - ResourceConfig resourceConfig = new ResourceConfig(); resourceConfig.register(new JacksonJsonProvider(JsonUtil.JSON_SERDE)); for (Object resource : resources) { @@ -119,17 +127,37 @@ public class JsonRestServer { return connector.getLocalPort(); } - public void stop() { - log.info("Stopping REST server"); + /** + * Initiate shutdown, but do not wait for it to complete. + */ + public void beginShutdown() { + if (!shutdownExecutor.isShutdown()) { + shutdownExecutor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + try { + log.info("Stopping REST server"); + jettyServer.stop(); + jettyServer.join(); + log.info("REST server stopped"); + } catch (Exception e) { + log.error("Unable to stop REST server", e); + } finally { + jettyServer.destroy(); + } + shutdownExecutor.shutdown(); + return null; + } + }); + } + } - try { - jettyServer.stop(); - jettyServer.join(); - log.info("REST server stopped"); - } catch (Exception e) { - log.error("Unable to stop REST server", e); - } finally { - jettyServer.destroy(); + /** + * Wait for shutdown to complete. May be called prior to beginShutdown. + */ + public void waitForShutdown() throws InterruptedException { + while (!shutdownExecutor.isShutdown()) { + shutdownExecutor.awaitTermination(1, TimeUnit.DAYS); } } @@ -197,6 +225,24 @@ public class JsonRestServer { } } + public static <T> HttpResponse<T> httpRequest(String url, String method, + Object requestBodyData, TypeReference<T> responseFormat, int maxTries) + throws IOException, InterruptedException { + IOException exc = null; + for (int tries = 0; tries < maxTries; tries++) { + if (tries > 0) { + Thread.sleep(tries > 1 ? 10 : 2); + } + try { + return httpRequest(url, method, requestBodyData, responseFormat); + } catch (IOException e) { + log.info("{} {}: error: {}", method, url, e.getMessage()); + exc = e; + } + } + throw exc; + } + public static class HttpResponse<T> { private final T body; private final ErrorResponse error; http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java new file mode 100644 index 0000000..c2ee840 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import org.apache.kafka.trogdor.common.JsonUtil; + +import java.util.Objects; + +public abstract class Message { + @Override + public final boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + return Objects.equals(toString(), o.toString()); + } + + @Override + public final int hashCode() { + return toString().hashCode(); + } + + @Override + public final String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java new file mode 100644 index 0000000..3287801 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A request to the Trogdor agent to stop a task. + */ +public class StopTaskRequest extends Message { + private final String id; + + @JsonCreator + public StopTaskRequest(@JsonProperty("id") String id) { + this.id = id; + } + + @JsonProperty + public String id() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java new file mode 100644 index 0000000..f344dc9 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * A response from the Trogdor coordinator about stopping a task. + */ +public class StopTaskResponse extends Message { + private final TaskSpec spec; + + @JsonCreator + public StopTaskResponse(@JsonProperty("spec") TaskSpec spec) { + this.spec = spec; + } + + @JsonProperty + public TaskSpec spec() { + return spec; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java new file mode 100644 index 0000000..54c689a --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A request to the Trogdor agent to stop a worker. + */ +public class StopWorkerRequest extends Message { + private final String id; + + @JsonCreator + public StopWorkerRequest(@JsonProperty("id") String id) { + this.id = id; + } + + @JsonProperty + public String id() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java new file mode 100644 index 0000000..7d5b468 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * A response from the Trogdor agent about stopping a worker. + */ +public class StopWorkerResponse extends Message { + private final TaskSpec spec; + + @JsonCreator + public StopWorkerResponse(@JsonProperty("spec") TaskSpec spec) { + this.spec = spec; + } + + @JsonProperty + public TaskSpec spec() { + return spec; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java new file mode 100644 index 0000000..536d3f2 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * The state a task is in once it's done. + */ +public class TaskDone extends TaskState { + /** + * The time on the coordinator when the task was started. + */ + private final long startedMs; + + /** + * The time on the coordinator when the task was completed. + */ + private final long doneMs; + + /** + * Empty if the task completed without error; the error message otherwise. + */ + private final String error; + + /** + * True if the task was manually cancelled, rather than terminating itself. + */ + private final boolean cancelled; + + @JsonCreator + public TaskDone(@JsonProperty("spec") TaskSpec spec, + @JsonProperty("startedMs") long startedMs, + @JsonProperty("doneMs") long doneMs, + @JsonProperty("error") String error, + @JsonProperty("cancelled") boolean cancelled) { + super(spec); + this.startedMs = startedMs; + this.doneMs = doneMs; + this.error = error; + this.cancelled = cancelled; + } + + @JsonProperty + public long startedMs() { + return startedMs; + } + + @JsonProperty + public long doneMs() { + return doneMs; + } + + @JsonProperty + public String error() { + return error; + } + + @JsonProperty + public boolean cancelled() { + return cancelled; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java new file mode 100644 index 0000000..b0162d3 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * The state for a task which is still pending. + */ +public class TaskPending extends TaskState { + @JsonCreator + public TaskPending(@JsonProperty("spec") TaskSpec spec) { + super(spec); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java new file mode 100644 index 0000000..bff3676 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * The state for a task which is being run by the agent. + */ +public class TaskRunning extends TaskState { + /** + * The time on the agent when the task was started. + */ + private final long startedMs; + + @JsonCreator + public TaskRunning(@JsonProperty("spec") TaskSpec spec, + @JsonProperty("startedMs") long startedMs) { + super(spec); + this.startedMs = startedMs; + } + + @JsonProperty + public long startedMs() { + return startedMs; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java new file mode 100644 index 0000000..28b6108 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * The state which a task is in on the Coordinator. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "state") +@JsonSubTypes({ + @JsonSubTypes.Type(value = TaskPending.class, name = "PENDING"), + @JsonSubTypes.Type(value = TaskRunning.class, name = "RUNNING"), + @JsonSubTypes.Type(value = TaskStopping.class, name = "STOPPING"), + @JsonSubTypes.Type(value = TaskDone.class, name = "DONE") + }) +public abstract class TaskState extends Message { + private final TaskSpec spec; + + public TaskState(TaskSpec spec) { + this.spec = spec; + } + + @JsonProperty + public TaskSpec spec() { + return spec; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java new file mode 100644 index 0000000..4446b75 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * The state for a task which is being stopped on the coordinator. + */ +public class TaskStopping extends TaskState { + /** + * The time on the agent when the task was received. + */ + private final long startedMs; + + @JsonCreator + public TaskStopping(@JsonProperty("spec") TaskSpec spec, + @JsonProperty("startedMs") long startedMs) { + super(spec); + this.startedMs = startedMs; + } + + @JsonProperty + public long startedMs() { + return startedMs; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java new file mode 100644 index 0000000..d3b415b --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.TreeMap; + +/** + * The response to /coordinator/tasks + */ +public class TasksResponse extends Message { + private final TreeMap<String, TaskState> tasks; + + @JsonCreator + public TasksResponse(@JsonProperty("tasks") TreeMap<String, TaskState> tasks) { + this.tasks = tasks; + } + + @JsonProperty + public TreeMap<String, TaskState> tasks() { + return tasks; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java new file mode 100644 index 0000000..0f46b25 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * The state a worker is in once it's done. + */ +public class WorkerDone extends WorkerState { + /** + * The time on the agent when the task was started. + */ + private final long startedMs; + + /** + * The time on the agent when the task was completed. + */ + private final long doneMs; + + /** + * The task status. The format will depend on the type of task that is + * being run. + */ + private final String status; + + /** + * Empty if the task completed without error; the error message otherwise. + */ + private final String error; + + @JsonCreator + public WorkerDone(@JsonProperty("spec") TaskSpec spec, + @JsonProperty("startedMs") long startedMs, + @JsonProperty("doneMs") long doneMs, + @JsonProperty("status") String status, + @JsonProperty("error") String error) { + super(spec); + this.startedMs = startedMs; + this.doneMs = doneMs; + this.status = status; + this.error = error; + } + + @JsonProperty + @Override + public long startedMs() { + return startedMs; + } + + @JsonProperty + public long doneMs() { + return doneMs; + } + + @JsonProperty + @Override + public String status() { + return status; + } + + @JsonProperty + public String error() { + return error; + } + + @Override + public boolean done() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java new file mode 100644 index 0000000..d3e3565 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.task.TaskSpec; + +/** + * When we're in the process of sending a TaskSpec to the Agent, the Worker is regarded + * as being in WorkerReceiving state. + */ +public final class WorkerReceiving extends WorkerState { + @JsonCreator + public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) { + super(spec); + } +}