[19/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java -- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java new file mode 100644 index 000..85db168 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -0,0 +1,920 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderConnectorContext; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.storage.KafkaConfigStorage; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + * Distributed "herder" that coordinates with other workers to spread work across multiple processes. + * + * + * Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized + * group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current + * configuration state is (where it is in the configuration log). The group coordinator selects one member to take + * this information and assign each instance a subset of the active connectors & tasks to execute. This assignment + * is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose + * to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once + * an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker. + * + * + * In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment + * to select a leader for this generation of the group who is responsible for other tasks that can only be performed + * by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks, + * (and therefore, also for creating, destroy, and scaling up/down connectors). + * + */ +public class DistributedHerder implements Herder, Runnable { +private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class); + +private static
[19/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java -- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java new file mode 100644 index 000..85db168 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -0,0 +1,920 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderConnectorContext; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.storage.KafkaConfigStorage; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + * Distributed "herder" that coordinates with other workers to spread work across multiple processes. + * + * + * Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized + * group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current + * configuration state is (where it is in the configuration log). The group coordinator selects one member to take + * this information and assign each instance a subset of the active connectors & tasks to execute. This assignment + * is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose + * to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once + * an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker. + * + * + * In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment + * to select a leader for this generation of the group who is responsible for other tasks that can only be performed + * by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks, + * (and therefore, also for creating, destroy, and scaling up/down connectors). + * + */ +public class DistributedHerder implements Herder, Runnable { +private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class); + +private static