http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
new file mode 100644
index 0000000..141e430
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka.
+ */
+class WorkerSourceTask implements WorkerTask {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerSourceTask.class);
+
+    private final ConnectorTaskId id;
+    private final SourceTask task;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private KafkaProducer<byte[], byte[]> producer;
+    private WorkerSourceTaskThread workThread;
+    private final OffsetStorageReader offsetReader;
+    private final OffsetStorageWriter offsetWriter;
+    private final WorkerConfig workerConfig;
+    private final Time time;
+
+    // Use IdentityHashMap to ensure correctness with duplicate records. This 
is a HashMap because
+    // there is no IdentityHashSet.
+    private IdentityHashMap<ProducerRecord<byte[], byte[]>, 
ProducerRecord<byte[], byte[]>> outstandingMessages;
+    // A second buffer is used while an offset flush is running
+    private IdentityHashMap<ProducerRecord<byte[], byte[]>, 
ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
+    private boolean flushing;
+
+    public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
+                            Converter keyConverter, Converter valueConverter,
+                            KafkaProducer<byte[], byte[]> producer,
+                            OffsetStorageReader offsetReader, 
OffsetStorageWriter offsetWriter,
+                            WorkerConfig workerConfig, Time time) {
+        this.id = id;
+        this.task = task;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.producer = producer;
+        this.offsetReader = offsetReader;
+        this.offsetWriter = offsetWriter;
+        this.workerConfig = workerConfig;
+        this.time = time;
+
+        this.outstandingMessages = new IdentityHashMap<>();
+        this.outstandingMessagesBacklog = new IdentityHashMap<>();
+        this.flushing = false;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, 
props);
+        workThread.start();
+    }
+
+    @Override
+    public void stop() {
+        if (workThread != null)
+            workThread.startGracefulShutdown();
+    }
+
+    @Override
+    public boolean awaitStop(long timeoutMs) {
+        boolean success = true;
+        if (workThread != null) {
+            try {
+                success = workThread.awaitShutdown(timeoutMs, 
TimeUnit.MILLISECONDS);
+                if (!success)
+                    workThread.forceShutdown();
+            } catch (InterruptedException e) {
+                success = false;
+            }
+        }
+        return success;
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do
+    }
+
+    /**
+     * Send a batch of records. This is atomic up to the point of getting the 
messages into the
+     * Producer and recorded in our set of outstanding messages, so either all 
or none will be sent
+     * @param records
+     */
+    private synchronized void sendRecords(List<SourceRecord> records) {
+        for (SourceRecord record : records) {
+            byte[] key = keyConverter.fromConnectData(record.topic(), 
record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
+            final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);
+            log.trace("Appending record with key {}, value {}", record.key(), 
record.value());
+            if (!flushing) {
+                outstandingMessages.put(producerRecord, producerRecord);
+            } else {
+                outstandingMessagesBacklog.put(producerRecord, producerRecord);
+            }
+            producer.send(
+                    producerRecord,
+                    new Callback() {
+                        @Override
+                        public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
+                            if (e != null) {
+                                log.error("Failed to send record: ", e);
+                            } else {
+                                log.trace("Wrote record successfully: topic {} 
partition {} offset {}",
+                                        recordMetadata.topic(), 
recordMetadata.partition(),
+                                        recordMetadata.offset());
+                            }
+                            recordSent(producerRecord);
+                        }
+                    });
+            // Offsets are converted & serialized in the OffsetWriter
+            offsetWriter.offset(record.sourcePartition(), 
record.sourceOffset());
+        }
+    }
+
+    private synchronized void recordSent(final ProducerRecord<byte[], byte[]> 
record) {
+        ProducerRecord<byte[], byte[]> removed = 
outstandingMessages.remove(record);
+        // While flushing, we may also see callbacks for items in the backlog
+        if (removed == null && flushing)
+            removed = outstandingMessagesBacklog.remove(record);
+        // But if neither one had it, something is very wrong
+        if (removed == null) {
+            log.error("Saw callback for record that was not present in the 
outstanding message set: "
+                    + "{}", record);
+        } else if (flushing && outstandingMessages.isEmpty()) {
+            // flush thread may be waiting on the outstanding messages to clear
+            this.notifyAll();
+        }
+    }
+
+    public boolean commitOffsets() {
+        long commitTimeoutMs = 
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+
+        long started = time.milliseconds();
+        long timeout = started + commitTimeoutMs;
+
+        synchronized (this) {
+            // First we need to make sure we snapshot everything in exactly 
the current state. This
+            // means both the current set of messages we're still waiting to 
finish, stored in this
+            // class, which setting flushing = true will handle by storing any 
new values into a new
+            // buffer; and the current set of user-specified offsets, stored 
in the
+            // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
+            flushing = true;
+            boolean flushStarted = offsetWriter.beginFlush();
+            // Still wait for any producer records to flush, even if there 
aren't any offsets to write
+            // to persistent storage
+
+            // Next we need to wait for all outstanding messages to finish 
sending
+            while (!outstandingMessages.isEmpty()) {
+                try {
+                    long timeoutMs = timeout - time.milliseconds();
+                    if (timeoutMs <= 0) {
+                        log.error(
+                                "Failed to flush {}, timed out while waiting 
for producer to flush outstanding "
+                                        + "messages", this.toString());
+                        finishFailedFlush();
+                        return false;
+                    }
+                    this.wait(timeoutMs);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+
+            if (!flushStarted) {
+                // There was nothing in the offsets to process, but we still 
waited for the data in the
+                // buffer to flush. This is useful since this can feed into 
metrics to monitor, e.g.
+                // flush time, which can be used for monitoring even if the 
connector doesn't record any
+                // offsets.
+                finishSuccessfulFlush();
+                log.debug("Finished {} offset commitOffsets successfully in {} 
ms",
+                        this, time.milliseconds() - started);
+                return true;
+            }
+        }
+
+        // Now we can actually flush the offsets to user storage.
+        Future<Void> flushFuture = offsetWriter.doFlush(new 
org.apache.kafka.connect.util.Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                if (error != null) {
+                    log.error("Failed to flush {} offsets to storage: ", this, 
error);
+                } else {
+                    log.trace("Finished flushing {} offsets to storage", this);
+                }
+            }
+        });
+        // Very rare case: offsets were unserializable and we finished 
immediately, unable to store
+        // any data
+        if (flushFuture == null) {
+            finishFailedFlush();
+            return false;
+        }
+        try {
+            flushFuture.get(Math.max(timeout - time.milliseconds(), 0), 
TimeUnit.MILLISECONDS);
+            // There's a small race here where we can get the callback just as 
this times out (and log
+            // success), but then catch the exception below and cancel 
everything. This won't cause any
+            // errors, is only wasteful in this minor edge case, and the worst 
result is that the log
+            // could look a little confusing.
+        } catch (InterruptedException e) {
+            log.warn("Flush of {} offsets interrupted, cancelling", this);
+            finishFailedFlush();
+            return false;
+        } catch (ExecutionException e) {
+            log.error("Flush of {} offsets threw an unexpected exception: ", 
this, e);
+            finishFailedFlush();
+            return false;
+        } catch (TimeoutException e) {
+            log.error("Timed out waiting to flush {} offsets to storage", 
this);
+            finishFailedFlush();
+            return false;
+        }
+
+        finishSuccessfulFlush();
+        log.debug("Finished {} commitOffsets successfully in {} ms",
+                this, time.milliseconds() - started);
+        return true;
+    }
+
+    private synchronized void finishFailedFlush() {
+        offsetWriter.cancelFlush();
+        outstandingMessages.putAll(outstandingMessagesBacklog);
+        outstandingMessagesBacklog.clear();
+        flushing = false;
+    }
+
+    private void finishSuccessfulFlush() {
+        // If we were successful, we can just swap instead of replacing items 
back into the original map
+        IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], 
byte[]>> temp = outstandingMessages;
+        outstandingMessages = outstandingMessagesBacklog;
+        outstandingMessagesBacklog = temp;
+        flushing = false;
+    }
+
+
+    private class WorkerSourceTaskThread extends ShutdownableThread {
+        private Map<String, String> workerProps;
+        private boolean finishedStart;
+        private boolean startedShutdownBeforeStartCompleted;
+
+        public WorkerSourceTaskThread(String name, Map<String, String> 
workerProps) {
+            super(name);
+            this.workerProps = workerProps;
+            this.finishedStart = false;
+            this.startedShutdownBeforeStartCompleted = false;
+        }
+
+        @Override
+        public void execute() {
+            try {
+                task.initialize(new WorkerSourceTaskContext(offsetReader));
+                task.start(workerProps);
+                log.info("Source task {} finished initialization and start", 
this);
+                synchronized (this) {
+                    if (startedShutdownBeforeStartCompleted) {
+                        task.stop();
+                        return;
+                    }
+                    finishedStart = true;
+                }
+
+                while (getRunning()) {
+                    List<SourceRecord> records = task.poll();
+                    if (records == null)
+                        continue;
+                    sendRecords(records);
+                }
+            } catch (InterruptedException e) {
+                // Ignore and allow to exit.
+            } catch (Throwable t) {
+                log.error("Task {} threw an uncaught and unrecoverable 
exception", id);
+                log.error("Task is being killed and will not recover until 
manually restarted:", t);
+                // It should still be safe to let this fall through and commit 
offsets since this exception would have
+                // simply resulted in not getting more records but all the 
existing records should be ok to flush
+                // and commit offsets. Worst case, task.flush() will also 
throw an exception causing the offset commit
+                // to fail.
+            }
+
+            commitOffsets();
+        }
+
+        @Override
+        public void startGracefulShutdown() {
+            super.startGracefulShutdown();
+            synchronized (this) {
+                if (finishedStart)
+                    task.stop();
+                else
+                    startedShutdownBeforeStartCompleted = true;
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "WorkerSourceTask{" +
+                "id=" + id +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
new file mode 100644
index 0000000..ac7d077
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+
+public class WorkerSourceTaskContext implements SourceTaskContext {
+
+    private final OffsetStorageReader reader;
+
+    public WorkerSourceTaskContext(OffsetStorageReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public OffsetStorageReader offsetStorageReader() {
+        return reader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
new file mode 100644
index 0000000..66fc45b
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+/**
+ * Handles processing for an individual task. This interface only provides the 
basic methods
+ * used by {@link Worker} to manage the tasks. Implementations combine a 
user-specified Task with
+ * Kafka to create a data flow.
+ */
+interface WorkerTask {
+    /**
+     * Start the Task
+     * @param props initial configuration
+     */
+    void start(Map<String, String> props);
+
+    /**
+     * Stop this task from processing messages. This method does not block, it 
only triggers
+     * shutdown. Use #{@link #awaitStop} to block until completion.
+     */
+    void stop();
+
+    /**
+     * Wait for this task to finish stopping.
+     *
+     * @param timeoutMs
+     * @return true if successful, false if the timeout was reached
+     */
+    boolean awaitStop(long timeoutMs);
+
+    /**
+     * Close this task. This is different from #{@link #stop} and #{@link 
#awaitStop} in that the
+     * stop methods ensure processing has stopped but may leave resources 
allocated. This method
+     * should clean up all resources.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
new file mode 100644
index 0000000..098872c
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An immutable snapshot of the configuration state of connectors and tasks in 
a Kafka Connect cluster.
+ */
+public class ClusterConfigState {
+    public static final ClusterConfigState EMPTY = new ClusterConfigState(-1, 
Collections.<String, Integer>emptyMap(),
+            Collections.<String, Map<String, String>>emptyMap(), 
Collections.<ConnectorTaskId, Map<String, String>>emptyMap(),
+            Collections.<String>emptySet());
+
+    private final long offset;
+    private final Map<String, Integer> connectorTaskCounts;
+    private final Map<String, Map<String, String>> connectorConfigs;
+    private final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
+    private final Set<String> inconsistentConnectors;
+
+    public ClusterConfigState(long offset,
+                              Map<String, Integer> connectorTaskCounts,
+                              Map<String, Map<String, String>> 
connectorConfigs,
+                              Map<ConnectorTaskId, Map<String, String>> 
taskConfigs,
+                              Set<String> inconsistentConnectors) {
+        this.offset = offset;
+        this.connectorTaskCounts = connectorTaskCounts;
+        this.connectorConfigs = connectorConfigs;
+        this.taskConfigs = taskConfigs;
+        this.inconsistentConnectors = inconsistentConnectors;
+    }
+
+    /**
+     * Get the last offset read to generate this config state. This offset is 
not guaranteed to be perfectly consistent
+     * with the recorded state because some partial updates to task configs 
may have been read.
+     * @return the latest config offset
+     */
+    public long offset() {
+        return offset;
+    }
+
+    /**
+     * Get a list of the connectors in this configuration
+     */
+    public Set<String> connectors() {
+        return connectorConfigs.keySet();
+    }
+
+    /**
+     * Get the configuration for a connector.
+     * @param connector name of the connector
+     * @return a map containing configuration parameters
+     */
+    public Map<String, String> connectorConfig(String connector) {
+        return connectorConfigs.get(connector);
+    }
+
+    /**
+     * Get the configuration for a task.
+     * @param task id of the task
+     * @return a map containing configuration parameters
+     */
+    public Map<String, String> taskConfig(ConnectorTaskId task) {
+        return taskConfigs.get(task);
+    }
+
+    /**
+     * Get the number of tasks assigned for the given conncetor.
+     * @param connectorName name of the connector to look up tasks for
+     * @return the number of tasks
+     */
+    public int taskCount(String connectorName) {
+        Integer count = connectorTaskCounts.get(connectorName);
+        return count == null ? 0 : count;
+    }
+
+    /**
+     * Get the current set of task IDs for the specified connector.
+     * @param connectorName the name of the connector to look up task configs 
for
+     * @return the current set of connector task IDs
+     */
+    public List<ConnectorTaskId> tasks(String connectorName) {
+        if (inconsistentConnectors.contains(connectorName))
+            return Collections.emptyList();
+
+        Integer numTasks = connectorTaskCounts.get(connectorName);
+        if (numTasks == null)
+            return Collections.emptyList();
+
+        List<ConnectorTaskId> taskIds = new ArrayList<>();
+        for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 
taskIndex);
+            taskIds.add(taskId);
+        }
+        return taskIds;
+    }
+
+    /**
+     * Get the set of connectors which have inconsistent data in this 
snapshot. These inconsistencies can occur due to
+     * partially completed writes combined with log compaction.
+     *
+     * Connectors in this set will appear in the output of {@link 
#connectors()} since their connector configuration is
+     * available, but not in the output of {@link 
#taskConfig(ConnectorTaskId)} since the task configs are incomplete.
+     *
+     * When a worker detects a connector in this state, it should request that 
the connector regenerate its task
+     * configurations.
+     *
+     * @return the set of inconsistent connectors
+     */
+    public Set<String> inconsistentConnectors() {
+        return inconsistentConnectors;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterConfigState{" +
+                "offset=" + offset +
+                ", connectorTaskCounts=" + connectorTaskCounts +
+                ", connectorConfigs=" + connectorConfigs +
+                ", taskConfigs=" + taskConfigs +
+                ", inconsistentConnectors=" + inconsistentConnectors +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
new file mode 100644
index 0000000..971873f
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class implements the protocol for Kafka Connect workers in a group. It 
includes the format of worker state used when
+ * joining the group and distributing assignments, and the format of 
assignments of connectors and tasks to workers.
+ */
+public class ConnectProtocol {
+    public static final String VERSION_KEY_NAME = "version";
+    public static final String URL_KEY_NAME = "url";
+    public static final String CONFIG_OFFSET_KEY_NAME = "config-offset";
+    public static final String CONNECTOR_KEY_NAME = "connector";
+    public static final String LEADER_KEY_NAME = "leader";
+    public static final String LEADER_URL_KEY_NAME = "leader-url";
+    public static final String ERROR_KEY_NAME = "error";
+    public static final String TASKS_KEY_NAME = "tasks";
+    public static final String ASSIGNMENT_KEY_NAME = "assignment";
+    public static final int CONNECTOR_TASK = -1;
+
+    public static final short CONNECT_PROTOCOL_V0 = 0;
+    public static final Schema CONNECT_PROTOCOL_HEADER_SCHEMA = new Schema(
+            new Field(VERSION_KEY_NAME, Type.INT16));
+    private static final Struct CONNECT_PROTOCOL_HEADER_V0 = new 
Struct(CONNECT_PROTOCOL_HEADER_SCHEMA)
+            .set(VERSION_KEY_NAME, CONNECT_PROTOCOL_V0);
+
+    public static final Schema CONFIG_STATE_V0 = new Schema(
+            new Field(URL_KEY_NAME, Type.STRING),
+            new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64));
+
+    // Assignments for each worker are a set of connectors and tasks. These 
are categorized by connector ID. A sentinel
+    // task ID (CONNECTOR_TASK) is used to indicate the connector itself (i.e. 
that the assignment includes
+    // responsibility for running the Connector instance in addition to any 
tasks it generates).
+    public static final Schema CONNECTOR_ASSIGNMENT_V0 = new Schema(
+            new Field(CONNECTOR_KEY_NAME, Type.STRING),
+            new Field(TASKS_KEY_NAME, new ArrayOf(Type.INT32)));
+    public static final Schema ASSIGNMENT_V0 = new Schema(
+            new Field(ERROR_KEY_NAME, Type.INT16),
+            new Field(LEADER_KEY_NAME, Type.STRING),
+            new Field(LEADER_URL_KEY_NAME, Type.STRING),
+            new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64),
+            new Field(ASSIGNMENT_KEY_NAME, new 
ArrayOf(CONNECTOR_ASSIGNMENT_V0)));
+
+    public static ByteBuffer serializeMetadata(WorkerState workerState) {
+        Struct struct = new Struct(CONFIG_STATE_V0);
+        struct.set(URL_KEY_NAME, workerState.url());
+        struct.set(CONFIG_OFFSET_KEY_NAME, workerState.offset());
+        ByteBuffer buffer = 
ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V0.sizeOf() + 
CONFIG_STATE_V0.sizeOf(struct));
+        CONNECT_PROTOCOL_HEADER_V0.writeTo(buffer);
+        CONFIG_STATE_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static WorkerState deserializeMetadata(ByteBuffer buffer) {
+        Struct header = (Struct) CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct struct = (Struct) CONFIG_STATE_V0.read(buffer);
+        long configOffset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
+        String url = struct.getString(URL_KEY_NAME);
+        return new WorkerState(url, configOffset);
+    }
+
+    public static ByteBuffer serializeAssignment(Assignment assignment) {
+        Struct struct = new Struct(ASSIGNMENT_V0);
+        struct.set(ERROR_KEY_NAME, assignment.error());
+        struct.set(LEADER_KEY_NAME, assignment.leader());
+        struct.set(LEADER_URL_KEY_NAME, assignment.leaderUrl());
+        struct.set(CONFIG_OFFSET_KEY_NAME, assignment.offset());
+        List<Struct> taskAssignments = new ArrayList<>();
+        for (Map.Entry<String, List<Integer>> connectorEntry : 
assignment.asMap().entrySet()) {
+            Struct taskAssignment = new Struct(CONNECTOR_ASSIGNMENT_V0);
+            taskAssignment.set(CONNECTOR_KEY_NAME, connectorEntry.getKey());
+            List<Integer> tasks = connectorEntry.getValue();
+            taskAssignment.set(TASKS_KEY_NAME, tasks.toArray());
+            taskAssignments.add(taskAssignment);
+        }
+        struct.set(ASSIGNMENT_KEY_NAME, taskAssignments.toArray());
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V0.sizeOf() + 
ASSIGNMENT_V0.sizeOf(struct));
+        CONNECT_PROTOCOL_HEADER_V0.writeTo(buffer);
+        ASSIGNMENT_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static Assignment deserializeAssignment(ByteBuffer buffer) {
+        Struct header = (Struct) CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct struct = (Struct) ASSIGNMENT_V0.read(buffer);
+        short error = struct.getShort(ERROR_KEY_NAME);
+        String leader = struct.getString(LEADER_KEY_NAME);
+        String leaderUrl = struct.getString(LEADER_URL_KEY_NAME);
+        long offset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
+        List<String> connectorIds = new ArrayList<>();
+        List<ConnectorTaskId> taskIds = new ArrayList<>();
+        for (Object structObj : struct.getArray(ASSIGNMENT_KEY_NAME)) {
+            Struct assignment = (Struct) structObj;
+            String connector = assignment.getString(CONNECTOR_KEY_NAME);
+            for (Object taskIdObj : assignment.getArray(TASKS_KEY_NAME)) {
+                Integer taskId = (Integer) taskIdObj;
+                if (taskId == CONNECTOR_TASK)
+                    connectorIds.add(connector);
+                else
+                    taskIds.add(new ConnectorTaskId(connector, taskId));
+            }
+        }
+        return new Assignment(error, leader, leaderUrl, offset, connectorIds, 
taskIds);
+    }
+
+    public static class WorkerState {
+        private final String url;
+        private final long offset;
+
+        public WorkerState(String url, long offset) {
+            this.url = url;
+            this.offset = offset;
+        }
+
+        public String url() {
+            return url;
+        }
+
+        public long offset() {
+            return offset;
+        }
+
+        @Override
+        public String toString() {
+            return "WorkerState{" +
+                    "url='" + url + '\'' +
+                    ", offset=" + offset +
+                    '}';
+        }
+    }
+
+    public static class Assignment {
+        public static final short NO_ERROR = 0;
+        // Configuration offsets mismatched in a way that the leader could not 
resolve. Workers should read to the end
+        // of the config log and try to re-join
+        public static final short CONFIG_MISMATCH = 1;
+
+        private final short error;
+        private final String leader;
+        private final String leaderUrl;
+        private final long offset;
+        private final List<String> connectorIds;
+        private final List<ConnectorTaskId> taskIds;
+
+        /**
+         * Create an assignment indicating responsibility for the given 
connector instances and task Ids.
+         * @param connectorIds list of connectors that the worker should 
instantiate and run
+         * @param taskIds list of task IDs that the worker should instantiate 
and run
+         */
+        public Assignment(short error, String leader, String leaderUrl, long 
configOffset,
+                          List<String> connectorIds, List<ConnectorTaskId> 
taskIds) {
+            this.error = error;
+            this.leader = leader;
+            this.leaderUrl = leaderUrl;
+            this.offset = configOffset;
+            this.taskIds = taskIds;
+            this.connectorIds = connectorIds;
+        }
+
+        public short error() {
+            return error;
+        }
+
+        public String leader() {
+            return leader;
+        }
+
+        public String leaderUrl() {
+            return leaderUrl;
+        }
+
+        public boolean failed() {
+            return error != NO_ERROR;
+        }
+
+        public long offset() {
+            return offset;
+        }
+
+        public List<String> connectors() {
+            return connectorIds;
+        }
+
+        public List<ConnectorTaskId> tasks() {
+            return taskIds;
+        }
+
+        @Override
+        public String toString() {
+            return "Assignment{" +
+                    "error=" + error +
+                    ", leader='" + leader + '\'' +
+                    ", leaderUrl='" + leaderUrl + '\'' +
+                    ", offset=" + offset +
+                    ", connectorIds=" + connectorIds +
+                    ", taskIds=" + taskIds +
+                    '}';
+        }
+
+        private Map<String, List<Integer>> asMap() {
+            // Using LinkedHashMap preserves the ordering, which is helpful 
for tests and debugging
+            Map<String, List<Integer>> taskMap = new LinkedHashMap<>();
+            for (String connectorId : new HashSet<>(connectorIds)) {
+                List<Integer> connectorTasks = taskMap.get(connectorId);
+                if (connectorTasks == null) {
+                    connectorTasks = new ArrayList<>();
+                    taskMap.put(connectorId, connectorTasks);
+                }
+                connectorTasks.add(CONNECTOR_TASK);
+            }
+            for (ConnectorTaskId taskId : taskIds) {
+                String connectorId = taskId.connector();
+                List<Integer> connectorTasks = taskMap.get(connectorId);
+                if (connectorTasks == null) {
+                    connectorTasks = new ArrayList<>();
+                    taskMap.put(connectorId, connectorTasks);
+                }
+                connectorTasks.add(taskId.task());
+            }
+            return taskMap;
+        }
+    }
+
+    private static void checkVersionCompatibility(short version) {
+        // check for invalid versions
+        if (version < CONNECT_PROTOCOL_V0)
+            throw new SchemaException("Unsupported subscription version: " + 
version);
+
+        // otherwise, assume versions can be parsed as V0
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
new file mode 100644
index 0000000..989f8c5
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class DistributedConfig extends WorkerConfig {
+    private static final ConfigDef CONFIG;
+
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES 
AS
+     * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    /**
+     * <code>group.id</code>
+     */
+    public static final String GROUP_ID_CONFIG = "group.id";
+    private static final String GROUP_ID_DOC = "A unique string that 
identifies the Connect cluster group this worker belongs to.";
+
+    /**
+     * <code>session.timeout.ms</code>
+     */
+    public static final String SESSION_TIMEOUT_MS_CONFIG = 
"session.timeout.ms";
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to 
detect failures when using Kafka's group management facilities.";
+
+    /**
+     * <code>heartbeat.interval.ms</code>
+     */
+    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
"heartbeat.interval.ms";
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time 
between heartbeats to the group coordinator when using Kafka's group management 
facilities. Heartbeats are used to ensure that the worker's session stays 
active and to facilitate rebalancing when new members join or leave the group. 
The value must be set lower than <code>session.timeout.ms</code>, but typically 
should be set no higher than 1/3 of that value. It can be adjusted even lower 
to control the expected time for normal rebalances.";
+
+    /**
+     * <code>worker.sync.timeout.ms</code>
+     */
+    public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = 
"worker.sync.timeout.ms";
+    private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker 
is out of sync with other workers and needs" +
+            " to resynchronize configurations, wait up to this amount of time 
before giving up, leaving the group, and" +
+            " waiting a backoff period before rejoining.";
+
+    /**
+     * <code>group.unsync.timeout.ms</code>
+     */
+    public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = 
"worker.unsync.backoff.ms";
+    private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the 
worker is out of sync with other workers and " +
+            " fails to catch up within worker.sync.timeout.ms, leave the 
Connect cluster for this long before rejoining.";
+    public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000;
+
+    static {
+        CONFIG = baseConfigDef()
+                .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, GROUP_ID_DOC)
+                .define(SESSION_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        30000,
+                        ConfigDef.Importance.HIGH,
+                        SESSION_TIMEOUT_MS_DOC)
+                .define(HEARTBEAT_INTERVAL_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.HIGH,
+                        HEARTBEAT_INTERVAL_MS_DOC)
+                .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
+                        ConfigDef.Type.LONG,
+                        5 * 60 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METADATA_MAX_AGE_DOC)
+                .define(CommonClientConfigs.CLIENT_ID_CONFIG,
+                        ConfigDef.Type.STRING,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.CLIENT_ID_DOC)
+                .define(CommonClientConfigs.SEND_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        128 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.SEND_BUFFER_DOC)
+                .define(CommonClientConfigs.RECEIVE_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        32 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.RECEIVE_BUFFER_DOC)
+                .define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        50L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        100L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        30000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                .define(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG,
+                        ConfigDef.Type.INT,
+                        2,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                        ConfigDef.Type.LIST,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 
ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, 
ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, 
SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, 
SslConfigs.SSL_PROTOCOL_DOC)
+                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, 
null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
+                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, 
SslConfigs.SSL_CIPHER_SUITES_DOC)
+                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, 
ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, 
ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, 
SslConfigs.SSL_KEYSTORE_LOCATION_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, 
SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)
+                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, 
SslConfigs.SSL_KEY_PASSWORD_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, 
ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, 
SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, 
SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
+                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, 
ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, 
ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, null,  ConfigDef.Importance.LOW, 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, 
ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, 
SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, 
ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, 
ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
ConfigDef.Importance.LOW, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, 
ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, 
ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 
ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 
ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, 
ConfigDef.Type.LIST, 
SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, 
ConfigDef.Importance.MEDIUM, 
SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        40 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                        /* default is set to be a bit lower than the server 
default (10 min), to avoid both client and server closing connection at same 
time */
+                .define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        9 * 60 * 1000,
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+                .define(WORKER_SYNC_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_SYNC_TIMEOUT_MS_DOC)
+                .define(WORKER_UNSYNC_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        WORKER_UNSYNC_BACKOFF_MS_DEFAULT,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_UNSYNC_BACKOFF_MS_DOC);
+    }
+
+    public DistributedConfig(Map<String, String> props) {
+        super(CONFIG, props);
+    }
+
+}

Reply via email to