[19/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect

2015-11-08 Thread gwenshap
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
--
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
new file mode 100644
index 000..85db168
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -0,0 +1,920 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * 
+ * Distributed "herder" that coordinates with other workers to spread work 
across multiple processes.
+ * 
+ * 
+ * Under the hood, this is implemented as a group managed by Kafka's group 
membership facilities (i.e. the generalized
+ * group/consumer coordinator). Each instance of DistributedHerder joins 
the group and indicates what it's current
+ * configuration state is (where it is in the configuration log). The 
group coordinator selects one member to take
+ * this information and assign each instance a subset of the active 
connectors & tasks to execute. This assignment
+ * is currently performed in a simple round-robin fashion, but this is not 
guaranteed -- the herder may also choose
+ * to, e.g., use a sticky assignment to avoid the usual start/stop costs 
associated with connectors and tasks. Once
+ * an assignment is received, the DistributedHerder simply runs its 
assigned connectors and tasks in a Worker.
+ * 
+ * 
+ * In addition to distributing work, the DistributedHerder uses the leader 
determined during the work assignment
+ * to select a leader for this generation of the group who is responsible 
for other tasks that can only be performed
+ * by a single node at a time. Most importantly, this includes writing 
updated configurations for connectors and tasks,
+ * (and therefore, also for creating, destroy, and scaling up/down 
connectors).
+ * 
+ */
+public class DistributedHerder implements Herder, Runnable {
+private static final Logger log = 
LoggerFactory.getLogger(DistributedHerder.class);
+
+private static 

[19/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect

2015-11-08 Thread gwenshap
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
--
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
new file mode 100644
index 000..85db168
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -0,0 +1,920 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * 
+ * Distributed "herder" that coordinates with other workers to spread work 
across multiple processes.
+ * 
+ * 
+ * Under the hood, this is implemented as a group managed by Kafka's group 
membership facilities (i.e. the generalized
+ * group/consumer coordinator). Each instance of DistributedHerder joins 
the group and indicates what it's current
+ * configuration state is (where it is in the configuration log). The 
group coordinator selects one member to take
+ * this information and assign each instance a subset of the active 
connectors & tasks to execute. This assignment
+ * is currently performed in a simple round-robin fashion, but this is not 
guaranteed -- the herder may also choose
+ * to, e.g., use a sticky assignment to avoid the usual start/stop costs 
associated with connectors and tasks. Once
+ * an assignment is received, the DistributedHerder simply runs its 
assigned connectors and tasks in a Worker.
+ * 
+ * 
+ * In addition to distributing work, the DistributedHerder uses the leader 
determined during the work assignment
+ * to select a leader for this generation of the group who is responsible 
for other tasks that can only be performed
+ * by a single node at a time. Most importantly, this includes writing 
updated configurations for connectors and tasks,
+ * (and therefore, also for creating, destroy, and scaling up/down 
connectors).
+ * 
+ */
+public class DistributedHerder implements Herder, Runnable {
+private static final Logger log = 
LoggerFactory.getLogger(DistributedHerder.class);
+
+private static