http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/generated/WorkerToken.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/WorkerToken.java 
b/storm-client/src/jvm/org/apache/storm/generated/WorkerToken.java
new file mode 100644
index 0000000..917efbd
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/WorkerToken.java
@@ -0,0 +1,638 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)")
+public class WorkerToken implements org.apache.thrift.TBase<WorkerToken, 
WorkerToken._Fields>, java.io.Serializable, Cloneable, Comparable<WorkerToken> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("WorkerToken");
+
+  private static final org.apache.thrift.protocol.TField 
SERVICE_TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("serviceType", 
org.apache.thrift.protocol.TType.I32, (short)1);
+  private static final org.apache.thrift.protocol.TField INFO_FIELD_DESC = new 
org.apache.thrift.protocol.TField("info", 
org.apache.thrift.protocol.TType.STRING, (short)2);
+  private static final org.apache.thrift.protocol.TField SIGNATURE_FIELD_DESC 
= new org.apache.thrift.protocol.TField("signature", 
org.apache.thrift.protocol.TType.STRING, (short)3);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new WorkerTokenStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new WorkerTokenTupleSchemeFactory());
+  }
+
+  private WorkerTokenServiceType serviceType; // required
+  private ByteBuffer info; // required
+  private ByteBuffer signature; // required
+
+  /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    /**
+     * 
+     * @see WorkerTokenServiceType
+     */
+    SERVICE_TYPE((short)1, "serviceType"),
+    INFO((short)2, "info"),
+    SIGNATURE((short)3, "signature");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not 
found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // SERVICE_TYPE
+          return SERVICE_TYPE;
+        case 2: // INFO
+          return INFO;
+        case 3: // SIGNATURE
+          return SIGNATURE;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.SERVICE_TYPE, new 
org.apache.thrift.meta_data.FieldMetaData("serviceType", 
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new 
org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, 
WorkerTokenServiceType.class)));
+    tmpMap.put(_Fields.INFO, new 
org.apache.thrift.meta_data.FieldMetaData("info", 
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING
        , true)));
+    tmpMap.put(_Fields.SIGNATURE, new 
org.apache.thrift.meta_data.FieldMetaData("signature", 
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING
        , true)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(WorkerToken.class,
 metaDataMap);
+  }
+
+  public WorkerToken() {
+  }
+
+  public WorkerToken(
+    WorkerTokenServiceType serviceType,
+    ByteBuffer info,
+    ByteBuffer signature)
+  {
+    this();
+    this.serviceType = serviceType;
+    this.info = org.apache.thrift.TBaseHelper.copyBinary(info);
+    this.signature = org.apache.thrift.TBaseHelper.copyBinary(signature);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public WorkerToken(WorkerToken other) {
+    if (other.is_set_serviceType()) {
+      this.serviceType = other.serviceType;
+    }
+    if (other.is_set_info()) {
+      this.info = org.apache.thrift.TBaseHelper.copyBinary(other.info);
+    }
+    if (other.is_set_signature()) {
+      this.signature = 
org.apache.thrift.TBaseHelper.copyBinary(other.signature);
+    }
+  }
+
+  public WorkerToken deepCopy() {
+    return new WorkerToken(this);
+  }
+
+  @Override
+  public void clear() {
+    this.serviceType = null;
+    this.info = null;
+    this.signature = null;
+  }
+
+  /**
+   * 
+   * @see WorkerTokenServiceType
+   */
+  public WorkerTokenServiceType get_serviceType() {
+    return this.serviceType;
+  }
+
+  /**
+   * 
+   * @see WorkerTokenServiceType
+   */
+  public void set_serviceType(WorkerTokenServiceType serviceType) {
+    this.serviceType = serviceType;
+  }
+
+  public void unset_serviceType() {
+    this.serviceType = null;
+  }
+
+  /** Returns true if field serviceType is set (has been assigned a value) and 
false otherwise */
+  public boolean is_set_serviceType() {
+    return this.serviceType != null;
+  }
+
+  public void set_serviceType_isSet(boolean value) {
+    if (!value) {
+      this.serviceType = null;
+    }
+  }
+
+  public byte[] get_info() {
+    set_info(org.apache.thrift.TBaseHelper.rightSize(info));
+    return info == null ? null : info.array();
+  }
+
+  public ByteBuffer buffer_for_info() {
+    return org.apache.thrift.TBaseHelper.copyBinary(info);
+  }
+
+  public void set_info(byte[] info) {
+    this.info = info == null ? (ByteBuffer)null : 
ByteBuffer.wrap(Arrays.copyOf(info, info.length));
+  }
+
+  public void set_info(ByteBuffer info) {
+    this.info = org.apache.thrift.TBaseHelper.copyBinary(info);
+  }
+
+  public void unset_info() {
+    this.info = null;
+  }
+
+  /** Returns true if field info is set (has been assigned a value) and false 
otherwise */
+  public boolean is_set_info() {
+    return this.info != null;
+  }
+
+  public void set_info_isSet(boolean value) {
+    if (!value) {
+      this.info = null;
+    }
+  }
+
+  public byte[] get_signature() {
+    set_signature(org.apache.thrift.TBaseHelper.rightSize(signature));
+    return signature == null ? null : signature.array();
+  }
+
+  public ByteBuffer buffer_for_signature() {
+    return org.apache.thrift.TBaseHelper.copyBinary(signature);
+  }
+
+  public void set_signature(byte[] signature) {
+    this.signature = signature == null ? (ByteBuffer)null : 
ByteBuffer.wrap(Arrays.copyOf(signature, signature.length));
+  }
+
+  public void set_signature(ByteBuffer signature) {
+    this.signature = org.apache.thrift.TBaseHelper.copyBinary(signature);
+  }
+
+  public void unset_signature() {
+    this.signature = null;
+  }
+
+  /** Returns true if field signature is set (has been assigned a value) and 
false otherwise */
+  public boolean is_set_signature() {
+    return this.signature != null;
+  }
+
+  public void set_signature_isSet(boolean value) {
+    if (!value) {
+      this.signature = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case SERVICE_TYPE:
+      if (value == null) {
+        unset_serviceType();
+      } else {
+        set_serviceType((WorkerTokenServiceType)value);
+      }
+      break;
+
+    case INFO:
+      if (value == null) {
+        unset_info();
+      } else {
+        set_info((ByteBuffer)value);
+      }
+      break;
+
+    case SIGNATURE:
+      if (value == null) {
+        unset_signature();
+      } else {
+        set_signature((ByteBuffer)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case SERVICE_TYPE:
+      return get_serviceType();
+
+    case INFO:
+      return get_info();
+
+    case SIGNATURE:
+      return get_signature();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned 
a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case SERVICE_TYPE:
+      return is_set_serviceType();
+    case INFO:
+      return is_set_info();
+    case SIGNATURE:
+      return is_set_signature();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof WorkerToken)
+      return this.equals((WorkerToken)that);
+    return false;
+  }
+
+  public boolean equals(WorkerToken that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_serviceType = true && this.is_set_serviceType();
+    boolean that_present_serviceType = true && that.is_set_serviceType();
+    if (this_present_serviceType || that_present_serviceType) {
+      if (!(this_present_serviceType && that_present_serviceType))
+        return false;
+      if (!this.serviceType.equals(that.serviceType))
+        return false;
+    }
+
+    boolean this_present_info = true && this.is_set_info();
+    boolean that_present_info = true && that.is_set_info();
+    if (this_present_info || that_present_info) {
+      if (!(this_present_info && that_present_info))
+        return false;
+      if (!this.info.equals(that.info))
+        return false;
+    }
+
+    boolean this_present_signature = true && this.is_set_signature();
+    boolean that_present_signature = true && that.is_set_signature();
+    if (this_present_signature || that_present_signature) {
+      if (!(this_present_signature && that_present_signature))
+        return false;
+      if (!this.signature.equals(that.signature))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_serviceType = true && (is_set_serviceType());
+    list.add(present_serviceType);
+    if (present_serviceType)
+      list.add(serviceType.getValue());
+
+    boolean present_info = true && (is_set_info());
+    list.add(present_info);
+    if (present_info)
+      list.add(info);
+
+    boolean present_signature = true && (is_set_signature());
+    list.add(present_signature);
+    if (present_signature)
+      list.add(signature);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(WorkerToken other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = 
Boolean.valueOf(is_set_serviceType()).compareTo(other.is_set_serviceType());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_serviceType()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.serviceType, other.serviceType);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = 
Boolean.valueOf(is_set_info()).compareTo(other.is_set_info());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_info()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.info, 
other.info);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = 
Boolean.valueOf(is_set_signature()).compareTo(other.is_set_signature());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_signature()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.signature, 
other.signature);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("WorkerToken(");
+    boolean first = true;
+
+    sb.append("serviceType:");
+    if (this.serviceType == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.serviceType);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("info:");
+    if (this.info == null) {
+      sb.append("null");
+    } else {
+      org.apache.thrift.TBaseHelper.toString(this.info, sb);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("signature:");
+    if (this.signature == null) {
+      sb.append("null");
+    } else {
+      org.apache.thrift.TBaseHelper.toString(this.signature, sb);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!is_set_serviceType()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 
'serviceType' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_info()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 
'info' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_signature()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 
'signature' is unset! Struct:" + toString());
+    }
+
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class WorkerTokenStandardSchemeFactory implements 
SchemeFactory {
+    public WorkerTokenStandardScheme getScheme() {
+      return new WorkerTokenStandardScheme();
+    }
+  }
+
+  private static class WorkerTokenStandardScheme extends 
StandardScheme<WorkerToken> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, WorkerToken 
struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // SERVICE_TYPE
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.serviceType = 
org.apache.storm.generated.WorkerTokenServiceType.findByValue(iprot.readI32());
+              struct.set_serviceType_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 2: // INFO
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.info = iprot.readBinary();
+              struct.set_info_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 3: // SIGNATURE
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.signature = iprot.readBinary();
+              struct.set_signature_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, WorkerToken 
struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.serviceType != null) {
+        oprot.writeFieldBegin(SERVICE_TYPE_FIELD_DESC);
+        oprot.writeI32(struct.serviceType.getValue());
+        oprot.writeFieldEnd();
+      }
+      if (struct.info != null) {
+        oprot.writeFieldBegin(INFO_FIELD_DESC);
+        oprot.writeBinary(struct.info);
+        oprot.writeFieldEnd();
+      }
+      if (struct.signature != null) {
+        oprot.writeFieldBegin(SIGNATURE_FIELD_DESC);
+        oprot.writeBinary(struct.signature);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class WorkerTokenTupleSchemeFactory implements SchemeFactory {
+    public WorkerTokenTupleScheme getScheme() {
+      return new WorkerTokenTupleScheme();
+    }
+  }
+
+  private static class WorkerTokenTupleScheme extends TupleScheme<WorkerToken> 
{
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, WorkerToken 
struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      oprot.writeI32(struct.serviceType.getValue());
+      oprot.writeBinary(struct.info);
+      oprot.writeBinary(struct.signature);
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, WorkerToken 
struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      struct.serviceType = 
org.apache.storm.generated.WorkerTokenServiceType.findByValue(iprot.readI32());
+      struct.set_serviceType_isSet(true);
+      struct.info = iprot.readBinary();
+      struct.set_info_isSet(true);
+      struct.signature = iprot.readBinary();
+      struct.set_signature_isSet(true);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/generated/WorkerTokenInfo.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/generated/WorkerTokenInfo.java 
b/storm-client/src/jvm/org/apache/storm/generated/WorkerTokenInfo.java
new file mode 100644
index 0000000..7d876ae
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/WorkerTokenInfo.java
@@ -0,0 +1,701 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)")
+public class WorkerTokenInfo implements 
org.apache.thrift.TBase<WorkerTokenInfo, WorkerTokenInfo._Fields>, 
java.io.Serializable, Cloneable, Comparable<WorkerTokenInfo> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("WorkerTokenInfo");
+
+  private static final org.apache.thrift.protocol.TField USER_NAME_FIELD_DESC 
= new org.apache.thrift.protocol.TField("userName", 
org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField 
TOPOLOGY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("topologyId", 
org.apache.thrift.protocol.TType.STRING, (short)2);
+  private static final org.apache.thrift.protocol.TField 
SECRET_VERSION_FIELD_DESC = new 
org.apache.thrift.protocol.TField("secretVersion", 
org.apache.thrift.protocol.TType.I64, (short)3);
+  private static final org.apache.thrift.protocol.TField 
EXPIRATION_TIME_MILLIS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("expirationTimeMillis", 
org.apache.thrift.protocol.TType.I64, (short)4);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new 
WorkerTokenInfoStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new WorkerTokenInfoTupleSchemeFactory());
+  }
+
+  private String userName; // required
+  private String topologyId; // required
+  private long secretVersion; // required
+  private long expirationTimeMillis; // required
+
+  /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    USER_NAME((short)1, "userName"),
+    TOPOLOGY_ID((short)2, "topologyId"),
+    SECRET_VERSION((short)3, "secretVersion"),
+    EXPIRATION_TIME_MILLIS((short)4, "expirationTimeMillis");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not 
found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // USER_NAME
+          return USER_NAME;
+        case 2: // TOPOLOGY_ID
+          return TOPOLOGY_ID;
+        case 3: // SECRET_VERSION
+          return SECRET_VERSION;
+        case 4: // EXPIRATION_TIME_MILLIS
+          return EXPIRATION_TIME_MILLIS;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __SECRETVERSION_ISSET_ID = 0;
+  private static final int __EXPIRATIONTIMEMILLIS_ISSET_ID = 1;
+  private byte __isset_bitfield = 0;
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.USER_NAME, new 
org.apache.thrift.meta_data.FieldMetaData("userName", 
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.TOPOLOGY_ID, new 
org.apache.thrift.meta_data.FieldMetaData("topologyId", 
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.SECRET_VERSION, new 
org.apache.thrift.meta_data.FieldMetaData("secretVersion", 
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.EXPIRATION_TIME_MILLIS, new 
org.apache.thrift.meta_data.FieldMetaData("expirationTimeMillis", 
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(WorkerTokenInfo.class,
 metaDataMap);
+  }
+
+  public WorkerTokenInfo() {
+  }
+
+  public WorkerTokenInfo(
+    String userName,
+    String topologyId,
+    long secretVersion,
+    long expirationTimeMillis)
+  {
+    this();
+    this.userName = userName;
+    this.topologyId = topologyId;
+    this.secretVersion = secretVersion;
+    set_secretVersion_isSet(true);
+    this.expirationTimeMillis = expirationTimeMillis;
+    set_expirationTimeMillis_isSet(true);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public WorkerTokenInfo(WorkerTokenInfo other) {
+    __isset_bitfield = other.__isset_bitfield;
+    if (other.is_set_userName()) {
+      this.userName = other.userName;
+    }
+    if (other.is_set_topologyId()) {
+      this.topologyId = other.topologyId;
+    }
+    this.secretVersion = other.secretVersion;
+    this.expirationTimeMillis = other.expirationTimeMillis;
+  }
+
+  public WorkerTokenInfo deepCopy() {
+    return new WorkerTokenInfo(this);
+  }
+
+  @Override
+  public void clear() {
+    this.userName = null;
+    this.topologyId = null;
+    set_secretVersion_isSet(false);
+    this.secretVersion = 0;
+    set_expirationTimeMillis_isSet(false);
+    this.expirationTimeMillis = 0;
+  }
+
+  public String get_userName() {
+    return this.userName;
+  }
+
+  public void set_userName(String userName) {
+    this.userName = userName;
+  }
+
+  public void unset_userName() {
+    this.userName = null;
+  }
+
+  /** Returns true if field userName is set (has been assigned a value) and 
false otherwise */
+  public boolean is_set_userName() {
+    return this.userName != null;
+  }
+
+  public void set_userName_isSet(boolean value) {
+    if (!value) {
+      this.userName = null;
+    }
+  }
+
+  public String get_topologyId() {
+    return this.topologyId;
+  }
+
+  public void set_topologyId(String topologyId) {
+    this.topologyId = topologyId;
+  }
+
+  public void unset_topologyId() {
+    this.topologyId = null;
+  }
+
+  /** Returns true if field topologyId is set (has been assigned a value) and 
false otherwise */
+  public boolean is_set_topologyId() {
+    return this.topologyId != null;
+  }
+
+  public void set_topologyId_isSet(boolean value) {
+    if (!value) {
+      this.topologyId = null;
+    }
+  }
+
+  public long get_secretVersion() {
+    return this.secretVersion;
+  }
+
+  public void set_secretVersion(long secretVersion) {
+    this.secretVersion = secretVersion;
+    set_secretVersion_isSet(true);
+  }
+
+  public void unset_secretVersion() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, 
__SECRETVERSION_ISSET_ID);
+  }
+
+  /** Returns true if field secretVersion is set (has been assigned a value) 
and false otherwise */
+  public boolean is_set_secretVersion() {
+    return EncodingUtils.testBit(__isset_bitfield, __SECRETVERSION_ISSET_ID);
+  }
+
+  public void set_secretVersion_isSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, 
__SECRETVERSION_ISSET_ID, value);
+  }
+
+  public long get_expirationTimeMillis() {
+    return this.expirationTimeMillis;
+  }
+
+  public void set_expirationTimeMillis(long expirationTimeMillis) {
+    this.expirationTimeMillis = expirationTimeMillis;
+    set_expirationTimeMillis_isSet(true);
+  }
+
+  public void unset_expirationTimeMillis() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, 
__EXPIRATIONTIMEMILLIS_ISSET_ID);
+  }
+
+  /** Returns true if field expirationTimeMillis is set (has been assigned a 
value) and false otherwise */
+  public boolean is_set_expirationTimeMillis() {
+    return EncodingUtils.testBit(__isset_bitfield, 
__EXPIRATIONTIMEMILLIS_ISSET_ID);
+  }
+
+  public void set_expirationTimeMillis_isSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, 
__EXPIRATIONTIMEMILLIS_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case USER_NAME:
+      if (value == null) {
+        unset_userName();
+      } else {
+        set_userName((String)value);
+      }
+      break;
+
+    case TOPOLOGY_ID:
+      if (value == null) {
+        unset_topologyId();
+      } else {
+        set_topologyId((String)value);
+      }
+      break;
+
+    case SECRET_VERSION:
+      if (value == null) {
+        unset_secretVersion();
+      } else {
+        set_secretVersion((Long)value);
+      }
+      break;
+
+    case EXPIRATION_TIME_MILLIS:
+      if (value == null) {
+        unset_expirationTimeMillis();
+      } else {
+        set_expirationTimeMillis((Long)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case USER_NAME:
+      return get_userName();
+
+    case TOPOLOGY_ID:
+      return get_topologyId();
+
+    case SECRET_VERSION:
+      return get_secretVersion();
+
+    case EXPIRATION_TIME_MILLIS:
+      return get_expirationTimeMillis();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned 
a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case USER_NAME:
+      return is_set_userName();
+    case TOPOLOGY_ID:
+      return is_set_topologyId();
+    case SECRET_VERSION:
+      return is_set_secretVersion();
+    case EXPIRATION_TIME_MILLIS:
+      return is_set_expirationTimeMillis();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof WorkerTokenInfo)
+      return this.equals((WorkerTokenInfo)that);
+    return false;
+  }
+
+  public boolean equals(WorkerTokenInfo that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_userName = true && this.is_set_userName();
+    boolean that_present_userName = true && that.is_set_userName();
+    if (this_present_userName || that_present_userName) {
+      if (!(this_present_userName && that_present_userName))
+        return false;
+      if (!this.userName.equals(that.userName))
+        return false;
+    }
+
+    boolean this_present_topologyId = true && this.is_set_topologyId();
+    boolean that_present_topologyId = true && that.is_set_topologyId();
+    if (this_present_topologyId || that_present_topologyId) {
+      if (!(this_present_topologyId && that_present_topologyId))
+        return false;
+      if (!this.topologyId.equals(that.topologyId))
+        return false;
+    }
+
+    boolean this_present_secretVersion = true;
+    boolean that_present_secretVersion = true;
+    if (this_present_secretVersion || that_present_secretVersion) {
+      if (!(this_present_secretVersion && that_present_secretVersion))
+        return false;
+      if (this.secretVersion != that.secretVersion)
+        return false;
+    }
+
+    boolean this_present_expirationTimeMillis = true;
+    boolean that_present_expirationTimeMillis = true;
+    if (this_present_expirationTimeMillis || 
that_present_expirationTimeMillis) {
+      if (!(this_present_expirationTimeMillis && 
that_present_expirationTimeMillis))
+        return false;
+      if (this.expirationTimeMillis != that.expirationTimeMillis)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_userName = true && (is_set_userName());
+    list.add(present_userName);
+    if (present_userName)
+      list.add(userName);
+
+    boolean present_topologyId = true && (is_set_topologyId());
+    list.add(present_topologyId);
+    if (present_topologyId)
+      list.add(topologyId);
+
+    boolean present_secretVersion = true;
+    list.add(present_secretVersion);
+    if (present_secretVersion)
+      list.add(secretVersion);
+
+    boolean present_expirationTimeMillis = true;
+    list.add(present_expirationTimeMillis);
+    if (present_expirationTimeMillis)
+      list.add(expirationTimeMillis);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(WorkerTokenInfo other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = 
Boolean.valueOf(is_set_userName()).compareTo(other.is_set_userName());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_userName()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.userName, 
other.userName);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = 
Boolean.valueOf(is_set_topologyId()).compareTo(other.is_set_topologyId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_topologyId()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.topologyId, other.topologyId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = 
Boolean.valueOf(is_set_secretVersion()).compareTo(other.is_set_secretVersion());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_secretVersion()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.secretVersion, 
other.secretVersion);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = 
Boolean.valueOf(is_set_expirationTimeMillis()).compareTo(other.is_set_expirationTimeMillis());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_expirationTimeMillis()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.expirationTimeMillis, 
other.expirationTimeMillis);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("WorkerTokenInfo(");
+    boolean first = true;
+
+    sb.append("userName:");
+    if (this.userName == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.userName);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("topologyId:");
+    if (this.topologyId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.topologyId);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("secretVersion:");
+    sb.append(this.secretVersion);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("expirationTimeMillis:");
+    sb.append(this.expirationTimeMillis);
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!is_set_userName()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 
'userName' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_topologyId()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 
'topologyId' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_secretVersion()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 
'secretVersion' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_expirationTimeMillis()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 
'expirationTimeMillis' is unset! Struct:" + toString());
+    }
+
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class WorkerTokenInfoStandardSchemeFactory implements 
SchemeFactory {
+    public WorkerTokenInfoStandardScheme getScheme() {
+      return new WorkerTokenInfoStandardScheme();
+    }
+  }
+
+  private static class WorkerTokenInfoStandardScheme extends 
StandardScheme<WorkerTokenInfo> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, 
WorkerTokenInfo struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // USER_NAME
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.userName = iprot.readString();
+              struct.set_userName_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 2: // TOPOLOGY_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.topologyId = iprot.readString();
+              struct.set_topologyId_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 3: // SECRET_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.secretVersion = iprot.readI64();
+              struct.set_secretVersion_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 4: // EXPIRATION_TIME_MILLIS
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.expirationTimeMillis = iprot.readI64();
+              struct.set_expirationTimeMillis_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, 
WorkerTokenInfo struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.userName != null) {
+        oprot.writeFieldBegin(USER_NAME_FIELD_DESC);
+        oprot.writeString(struct.userName);
+        oprot.writeFieldEnd();
+      }
+      if (struct.topologyId != null) {
+        oprot.writeFieldBegin(TOPOLOGY_ID_FIELD_DESC);
+        oprot.writeString(struct.topologyId);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldBegin(SECRET_VERSION_FIELD_DESC);
+      oprot.writeI64(struct.secretVersion);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(EXPIRATION_TIME_MILLIS_FIELD_DESC);
+      oprot.writeI64(struct.expirationTimeMillis);
+      oprot.writeFieldEnd();
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class WorkerTokenInfoTupleSchemeFactory implements 
SchemeFactory {
+    public WorkerTokenInfoTupleScheme getScheme() {
+      return new WorkerTokenInfoTupleScheme();
+    }
+  }
+
+  private static class WorkerTokenInfoTupleScheme extends 
TupleScheme<WorkerTokenInfo> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, 
WorkerTokenInfo struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      oprot.writeString(struct.userName);
+      oprot.writeString(struct.topologyId);
+      oprot.writeI64(struct.secretVersion);
+      oprot.writeI64(struct.expirationTimeMillis);
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, 
WorkerTokenInfo struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      struct.userName = iprot.readString();
+      struct.set_userName_isSet(true);
+      struct.topologyId = iprot.readString();
+      struct.set_topologyId_isSet(true);
+      struct.secretVersion = iprot.readI64();
+      struct.set_secretVersion_isSet(true);
+      struct.expirationTimeMillis = iprot.readI64();
+      struct.set_expirationTimeMillis_isSet(true);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/generated/WorkerTokenServiceType.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/generated/WorkerTokenServiceType.java 
b/storm-client/src/jvm/org/apache/storm/generated/WorkerTokenServiceType.java
new file mode 100644
index 0000000..99c26a8
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/generated/WorkerTokenServiceType.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.storm.generated;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum WorkerTokenServiceType implements org.apache.thrift.TEnum {
+  NIMBUS(0),
+  DRPC(1);
+
+  private final int value;
+
+  private WorkerTokenServiceType(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static WorkerTokenServiceType findByValue(int value) { 
+    switch (value) {
+      case 0:
+        return NIMBUS;
+      case 1:
+        return DRPC;
+      default:
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/AbstractSaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/AbstractSaslClientCallbackHandler.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/AbstractSaslClientCallbackHandler.java
deleted file mode 100644
index 04710ba..0000000
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/AbstractSaslClientCallbackHandler.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import java.io.IOException;
-
-public abstract class AbstractSaslClientCallbackHandler implements 
CallbackHandler {
-    protected static final String USERNAME = "username";
-    protected static final String PASSWORD = "password";
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractSaslClientCallbackHandler.class);
-    protected String _username = null;
-    protected String _password = null;
-
-    /**
-     * This method is invoked by SASL for authentication challenges
-     * @param callbacks a collection of challenge callbacks
-     */
-    public void handle(Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
-        for (Callback c : callbacks) {
-            if (c instanceof NameCallback) {
-                LOG.debug("name callback");
-                NameCallback nc = (NameCallback) c;
-                nc.setName(_username);
-            } else if (c instanceof PasswordCallback) {
-                LOG.debug("password callback");
-                PasswordCallback pc = (PasswordCallback)c;
-                if (_password != null) {
-                    pc.setPassword(_password.toCharArray());
-                }
-            } else if (c instanceof AuthorizeCallback) {
-                LOG.debug("authorization callback");
-                AuthorizeCallback ac = (AuthorizeCallback) c;
-                String authid = ac.getAuthenticationID();
-                String authzid = ac.getAuthorizationID();
-                if (authid.equals(authzid)) {
-                    ac.setAuthorized(true);
-                } else {
-                    ac.setAuthorized(false);
-                }
-                if (ac.isAuthorized()) {
-                    ac.setAuthorizedID(authzid);
-                }
-            } else if (c instanceof RealmCallback) {
-                RealmCallback rc = (RealmCallback) c;
-                ((RealmCallback) c).setText(rc.getDefaultText());
-            } else {
-                throw new UnsupportedCallbackException(c);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/AbstractSaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/AbstractSaslServerCallbackHandler.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/AbstractSaslServerCallbackHandler.java
deleted file mode 100644
index ebbe2ea..0000000
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/AbstractSaslServerCallbackHandler.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class AbstractSaslServerCallbackHandler implements 
CallbackHandler {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractSaslServerCallbackHandler.class);
-    protected final Map<String,String> credentials = new HashMap<>();
-    protected String userName;
-
-    public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
-        for (Callback callback : callbacks) {
-            if (callback instanceof NameCallback) {
-                handleNameCallback((NameCallback) callback);
-            } else if (callback instanceof PasswordCallback) {
-                handlePasswordCallback((PasswordCallback) callback);
-            } else if (callback instanceof RealmCallback) {
-                handleRealmCallback((RealmCallback) callback);
-            } else if (callback instanceof AuthorizeCallback) {
-                handleAuthorizeCallback((AuthorizeCallback) callback);
-            }
-        }
-    }
-
-    private void handleNameCallback(NameCallback nc) {
-        LOG.debug("handleNameCallback");
-        userName = nc.getDefaultName();
-        nc.setName(nc.getDefaultName());
-    }
-
-    protected void handlePasswordCallback(PasswordCallback pc) {
-        LOG.debug("handlePasswordCallback");
-        if (credentials.containsKey(userName) ) {
-            pc.setPassword(credentials.get(userName).toCharArray());
-        } else {
-            LOG.warn("No password found for user: {}", userName);
-        }
-    }
-
-    private void handleRealmCallback(RealmCallback rc) {
-        LOG.debug("handleRealmCallback: {}", rc.getDefaultText());
-        rc.setText(rc.getDefaultText());
-    }
-
-    private void handleAuthorizeCallback(AuthorizeCallback ac) {
-        String authenticationID = ac.getAuthenticationID();
-        LOG.info("Successfully authenticated client: authenticationID = {} 
authorizationID = {}",
-            authenticationID, ac.getAuthorizationID());
-
-        //if authorizationId is not set, set it to authenticationId.
-        if(ac.getAuthorizationID() == null) {
-            ac.setAuthorizedID(authenticationID);
-        }
-
-        //When authNid and authZid are not equal , authNId is attempting to 
impersonate authZid, We
-        //add the authNid as the real user in reqContext's subject which will 
be used during authorization.
-        if(!authenticationID.equals(ac.getAuthorizationID())) {
-            LOG.info("Impersonation attempt  authenticationID = {} 
authorizationID = {}",
-                ac.getAuthenticationID(),  ac.getAuthorizationID());
-            ReqContext.context().setRealPrincipal(new 
SaslTransportPlugin.User(ac.getAuthenticationID()));
-        } else {
-            ReqContext.context().setRealPrincipal(null);
-        }
-
-        ac.setAuthorized(true);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/AuthUtils.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/AuthUtils.java
index e87c701..0c5d67a 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/AuthUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/AuthUtils.java
@@ -15,35 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.security.auth;
 
-import javax.security.auth.kerberos.KerberosTicket;
-import org.apache.storm.Config;
-import javax.security.auth.login.Configuration;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.Subject;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.security.URIParameter;
+import java.net.URI;
 import java.security.MessageDigest;
-
+import java.security.URIParameter;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.generated.WorkerTokenInfo;
+import org.apache.storm.generated.WorkerTokenServiceType;
 import org.apache.storm.security.INimbusCredentialPlugin;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 public class AuthUtils {
     private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
@@ -265,6 +270,128 @@ public class AuthUtils {
     }
 
     /**
+     * Get the key used to store a WorkerToken in the credentials map
+     * @param type the type of service to get.
+     * @return the key as a String.
+     */
+    public static String workerTokenCredentialsKey(WorkerTokenServiceType 
type) {
+        return "STORM_WORKER_TOKEN_" + type.name();
+    }
+
+    /**
+     * Read a WorkerToken out of credentials for the given type.
+     * @param credentials the credentials map.
+     * @param type the type of service we are looking for.
+     * @return the deserialized WorkerToken or null if none could be found.
+     */
+    public static WorkerToken readWorkerToken(Map<String,String> credentials, 
WorkerTokenServiceType type) {
+        WorkerToken ret = null;
+        String key = workerTokenCredentialsKey(type);
+        String tokenStr = credentials.get(key);
+        if (tokenStr != null) {
+            ret = Utils.deserializeFromString(tokenStr, WorkerToken.class);
+        }
+        return ret;
+    }
+
+    /**
+     * Store a worker token in some credentials. It can be pulled back out by 
calling readWorkerToken.
+     * @param credentials the credentials map.
+     * @param token the token you want to store.
+     */
+    public static void setWorkerToken(Map<String,String> credentials, 
WorkerToken token) {
+        String key = workerTokenCredentialsKey(token.get_serviceType());
+        credentials.put(key, Utils.serializeToString(token));
+    }
+
+    /**
+     * Find a worker token in a given subject with a given token type.
+     * @param subject what to look in.
+     * @param type the type of token to look for.
+     * @return the token or null.
+     */
+    public static WorkerToken findWorkerToken(Subject subject, final 
WorkerTokenServiceType type) {
+        Set<WorkerToken> creds = 
subject.getPrivateCredentials(WorkerToken.class);
+        synchronized(creds) {
+            return creds.stream()
+                .filter((wt) ->
+                    wt.get_serviceType() == type)
+                .findAny().orElse(null);
+        }
+    }
+
+    private static boolean willWorkerTokensBeStoredSecurely(Map<String, 
Object> conf) {
+        boolean overrideZkAuth = 
ObjectReader.getBoolean(conf.get("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS"), 
false);
+        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+            return true;
+        } else if (overrideZkAuth) {
+            LOG.error("\n\n\t\tYOU HAVE ENABLED INSECURE WORKER TOKENS.  IF 
THIS IS NOT A UNIT TEST PLEASE STOP NOW!!!\n\n");
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Check if worker tokens should be enabled on the server side or not.
+     * @param server a Thrift server to know if the transport support tokens 
or not.  No need to create a token if the transport does not
+     * support it.
+     * @param conf the daemon configuration to be sure the tokens are secure.
+     * @return true if we can enable them, else false.
+     */
+    public static boolean areWorkerTokensEnabledServer(ThriftServer server, 
Map<String, Object> conf) {
+        return server.supportsWorkerTokens() && 
willWorkerTokensBeStoredSecurely(conf);
+    }
+
+    /**
+     * Check if worker tokens should be enabled on the server side or not (for 
a given server).
+     * @param connectionType the type of server this is for.
+     * @param conf the daemon configuration to be sure the tokens are secure.
+     * @return true if we can enable them, else false.
+     */
+    public static boolean areWorkerTokensEnabledServer(ThriftConnectionType 
connectionType, Map<String, Object> conf) {
+        return connectionType.getWtType() != null && 
willWorkerTokensBeStoredSecurely(conf);
+    }
+
+    /**
+     * Turn a WorkerTokenInfo in a byte array.
+     * @param wti what to serialize.
+     * @return the resulting byte array.
+     */
+    public static byte[] serializeWorkerTokenInfo(WorkerTokenInfo wti) {
+        return Utils.serialize(wti);
+    }
+
+    /**
+     * Get and deserialize the WorkerTokenInfo in the worker token.
+     * @param wt the token.
+     * @return the deserialized info.
+     */
+    public static WorkerTokenInfo getWorkerTokenInfo(WorkerToken wt) {
+        return Utils.deserialize(wt.get_info(), WorkerTokenInfo.class);
+    }
+
+    //Support for worker tokens Similar to an IAutoCredentials implementation
+    private static Subject insertWorkerTokens(Subject subject, 
Map<String,String> credentials) {
+        if (credentials == null) {
+            return subject;
+        }
+        for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) {
+            WorkerToken token = readWorkerToken(credentials, type);
+            if (token != null) {
+                Set<Object> creds = subject.getPrivateCredentials();
+                synchronized (creds) {
+                    WorkerToken previous = findWorkerToken(subject, type);
+                    creds.add(token);
+                    if (previous != null) {
+                        creds.remove(previous);
+                    }
+                }
+            }
+        }
+        return subject;
+    }
+
+    /**
      * Populate a subject from credentials using the IAutoCredentials.
      * @param subject the subject to populate or null if a new Subject should 
be created.
      * @param autos the IAutoCredentials to call to populate the subject.
@@ -279,7 +406,7 @@ public class AuthUtils {
             for (IAutoCredentials autoCred : autos) {
                 autoCred.populateSubject(subject, credentials);
             }
-            return subject;
+            return insertWorkerTokens(subject, credentials);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -303,12 +430,13 @@ public class AuthUtils {
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+        insertWorkerTokens(subject, credentials);
     }
 
     /**
      * Construct a transport plugin per storm configuration
      */
-    public static ITransportPlugin GetTransportPlugin(ThriftConnectionType 
type, Map<String, Object> topoConf, Configuration login_conf) {
+    public static ITransportPlugin getTransportPlugin(ThriftConnectionType 
type, Map<String, Object> topoConf, Configuration login_conf) {
         try {
             String transport_plugin_klassName = 
type.getTransportPlugin(topoConf);
             ITransportPlugin transportPlugin = 
ReflectionUtils.newInstance(transport_plugin_klassName);

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
index f51b1e2..8cf52cd 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
@@ -35,9 +35,9 @@ public interface ITransportPlugin {
      * Invoked once immediately after construction
      * @param type the type of connection this will process.
      * @param topoConf Storm configuration 
-     * @param login_conf login configuration
+     * @param loginConf login configuration
      */
-    void prepare(ThriftConnectionType type, Map<String, Object> topoConf, 
Configuration login_conf);
+    void prepare(ThriftConnectionType type, Map<String, Object> topoConf, 
Configuration loginConf);
     
     /**
      * Create a server associated with a given port, service handler, and 
purpose
@@ -59,4 +59,12 @@ public interface ITransportPlugin {
      * @return The port this transport is using. This is not known until 
{@link #getServer(org.apache.thrift.TProcessor)} has been called.
      */
     public int getPort();
+
+    /**
+     * Check if worker tokens are supported by this transport.
+     * @return true if they are else false.
+     */
+    default boolean areWorkerTokensSupported() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java
index 6d92fd9..fd2266f 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java
@@ -91,6 +91,17 @@ public class ReqContext {
     }
 
     /**
+     * Copy Constructor.
+     */
+    @VisibleForTesting
+    public ReqContext(ReqContext other) {
+        _subject = other._subject;
+        _remoteAddr = other._remoteAddr;
+        _reqID = other._reqID;
+        realPrincipal = other.realPrincipal;
+    }
+
+    /**
      * client address
      */
     public void setRemoteAddress(InetAddress addr) {

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
deleted file mode 100644
index 5ad3a5a..0000000
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.security.Principal;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.auth.Subject;
-import javax.security.auth.login.Configuration;
-import javax.security.sasl.SaslServer;
-
-import org.apache.storm.utils.ExtendedThreadPoolExecutor;
-import org.apache.storm.security.auth.kerberos.NoOpTTrasport;
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TSaslServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
-
-/**
- * Base class for SASL authentication plugin.
- */
-public abstract class SaslTransportPlugin implements ITransportPlugin {
-    protected ThriftConnectionType type;
-    protected Map<String, Object> topoConf;
-    protected Configuration login_conf;
-    private int port;
-
-    @Override
-    public void prepare(ThriftConnectionType type, Map<String, Object> 
topoConf, Configuration login_conf) {
-        this.type = type;
-        this.topoConf = topoConf;
-        this.login_conf = login_conf;
-    }
-
-    @Override
-    public TServer getServer(TProcessor processor) throws IOException, 
TTransportException {
-        int configuredPort = type.getPort(topoConf);
-        Integer socketTimeout = type.getSocketTimeOut(topoConf);
-        TTransportFactory serverTransportFactory = getServerTransportFactory();
-        TServerSocket serverTransport = null;
-        if (socketTimeout != null) {
-            serverTransport = new TServerSocket(configuredPort, socketTimeout);
-        } else {
-            serverTransport = new TServerSocket(configuredPort);
-        }
-        this.port = serverTransport.getServerSocket().getLocalPort();
-        int numWorkerThreads = type.getNumThreads(topoConf);
-        Integer queueSize = type.getQueueSize(topoConf);
-
-        TThreadPoolServer.Args server_args = new 
TThreadPoolServer.Args(serverTransport).
-                processor(new TUGIWrapProcessor(processor)).
-                minWorkerThreads(numWorkerThreads).
-                maxWorkerThreads(numWorkerThreads).
-                protocolFactory(new TBinaryProtocol.Factory(false, true));
-
-        if (serverTransportFactory != null) {
-            server_args.transportFactory(serverTransportFactory);
-        }
-        BlockingQueue workQueue = new SynchronousQueue();
-        if (queueSize != null) {
-            workQueue = new ArrayBlockingQueue(queueSize);
-        }
-        ThreadPoolExecutor executorService = new 
ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
-            60, TimeUnit.SECONDS, workQueue);
-        server_args.executorService(executorService);
-        return new TThreadPoolServer(server_args);
-    }
-
-    /**
-     * All subclass must implement this method
-     * @return server transport factory
-     * @throws IOException
-     */
-    protected abstract TTransportFactory getServerTransportFactory() throws 
IOException;
-    
-    @Override
-    public int getPort() {
-        return this.port;
-    }
-
-
-    /**
-     * Processor that pulls the SaslServer object out of the transport, and
-     * assumes the remote user's UGI before calling through to the original
-     * processor.
-     *
-     * This is used on the server side to set the UGI for each specific call.
-     */
-    private static class TUGIWrapProcessor implements TProcessor {
-        final TProcessor wrapped;
-
-        TUGIWrapProcessor(TProcessor wrapped) {
-            this.wrapped = wrapped;
-        }
-
-        public boolean process(final TProtocol inProt, final TProtocol 
outProt) throws TException {
-            //populating request context
-            ReqContext req_context = ReqContext.context();
-
-            TTransport trans = inProt.getTransport();
-            //Sasl transport
-            TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
-
-            if(trans instanceof NoOpTTrasport) {
-                return false;
-            }
-
-            //remote address
-            TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
-            Socket socket = tsocket.getSocket();
-            req_context.setRemoteAddress(socket.getInetAddress());
-
-            //remote subject
-            SaslServer saslServer = saslTrans.getSaslServer();
-            String authId = saslServer.getAuthorizationID();
-            Subject remoteUser = new Subject();
-            remoteUser.getPrincipals().add(new User(authId));
-            req_context.setSubject(remoteUser);
-
-            //invoke service handler
-            return wrapped.process(inProt, outProt);
-        }
-    }
-
-    public static class User implements Principal {
-        private final String name;
-
-        public User(String name) {
-            this.name =  name;
-        }
-
-        /**
-         * Get the full name of the user.
-         */
-        public String getName() {
-            return name;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            return !(o == null || getClass() != o.getClass()) && 
(name.equals(((User) o).name));
-        }
-
-        @Override
-        public int hashCode() {
-            return name.hashCode();
-        }
-
-        @Override
-        public String toString() {
-            return name;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
index 698f797..99b0c21 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
@@ -58,10 +58,10 @@ public class SimpleTransportPlugin implements 
ITransportPlugin {
     private int port;
 
     @Override
-    public void prepare(ThriftConnectionType type, Map<String, Object> 
topoConf, Configuration login_conf) {
+    public void prepare(ThriftConnectionType type, Map<String, Object> 
topoConf, Configuration loginConf) {
         this.type = type;
         this.topoConf = topoConf;
-        this.login_conf = login_conf;
+        this.login_conf = loginConf;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
index ff3227f..040356f 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
@@ -92,7 +92,7 @@ public class ThriftClient implements AutoCloseable {
             Configuration login_conf = AuthUtils.GetConfiguration(_conf);
 
             //construct a transport plugin
-            ITransportPlugin transportPlugin = 
AuthUtils.GetTransportPlugin(_type, _conf, login_conf);
+            ITransportPlugin transportPlugin = 
AuthUtils.getTransportPlugin(_type, _conf, login_conf);
 
             //TODO get this from type instead of hardcoding to Nimbus.
             //establish client-server transport via plugin

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
index 2d86e47..398dbeb 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
@@ -15,8 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.security.auth;
 
+import org.apache.storm.generated.WorkerTokenServiceType;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.Config;
 
@@ -27,47 +29,53 @@ import java.util.Map;
  */
 public enum ThriftConnectionType {
     NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, 
Config.NIMBUS_QUEUE_SIZE,
-         Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, 
Config.STORM_THRIFT_SOCKET_TIMEOUT_MS),
+         Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, 
Config.STORM_THRIFT_SOCKET_TIMEOUT_MS,
+        WorkerTokenServiceType.NIMBUS),
+    //A DRPC token only works for the invocations transport, not for the basic 
thrift transport.
     DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, 
Config.DRPC_QUEUE_SIZE,
-         Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null),
+         Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, null),
     DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, 
Config.DRPC_INVOCATIONS_PORT, null,
-         Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null),
+         Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, 
WorkerTokenServiceType.DRPC),
     LOCAL_FAKE;
 
-    private final String _transConf;
-    private final String _portConf;
-    private final String _qConf;
-    private final String _threadsConf;
-    private final String _buffConf;
-    private final String _socketTimeoutConf;
-    private final boolean _isFake;
+    private final String transConf;
+    private final String portConf;
+    private final String qConf;
+    private final String threadsConf;
+    private final String buffConf;
+    private final String socketTimeoutConf;
+    private final boolean isFake;
+    private final WorkerTokenServiceType wtType;
 
     ThriftConnectionType() {
-        this(null, null, null, null, null, null, true);
+        this(null, null, null, null, null, null, true, null);
     }
     
     ThriftConnectionType(String transConf, String portConf, String qConf,
-            String threadsConf, String buffConf, String socketTimeoutConf) {
-        this(transConf, portConf, qConf, threadsConf, buffConf, 
socketTimeoutConf, false);
+                         String threadsConf, String buffConf, String 
socketTimeoutConf,
+                         WorkerTokenServiceType wtType) {
+        this(transConf, portConf, qConf, threadsConf, buffConf, 
socketTimeoutConf, false, wtType);
     }
     
     ThriftConnectionType(String transConf, String portConf, String qConf,
-                         String threadsConf, String buffConf, String 
socketTimeoutConf, boolean isFake) {
-        _transConf = transConf;
-        _portConf = portConf;
-        _qConf = qConf;
-        _threadsConf = threadsConf;
-        _buffConf = buffConf;
-        _socketTimeoutConf = socketTimeoutConf;
-        _isFake = isFake;
+                         String threadsConf, String buffConf, String 
socketTimeoutConf, boolean isFake,
+                         WorkerTokenServiceType wtType) {
+        this.transConf = transConf;
+        this.portConf = portConf;
+        this.qConf = qConf;
+        this.threadsConf = threadsConf;
+        this.buffConf = buffConf;
+        this.socketTimeoutConf = socketTimeoutConf;
+        this.isFake = isFake;
+        this.wtType = wtType;
     }
 
     public boolean isFake() {
-        return _isFake;
+        return isFake;
     }
     
     public String getTransportPlugin(Map<String, Object> conf) {
-        String ret = (String)conf.get(_transConf);
+        String ret = (String)conf.get(transConf);
         if (ret == null) {
             ret = (String)conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
         }
@@ -75,37 +83,44 @@ public enum ThriftConnectionType {
     }
 
     public int getPort(Map<String, Object> conf) {
-        if (_isFake) {
+        if (isFake) {
             return -1;
         }
-        return ObjectReader.getInt(conf.get(_portConf));
+        return ObjectReader.getInt(conf.get(portConf));
     }
 
     public Integer getQueueSize(Map<String, Object> conf) {
-        if (_qConf == null) {
+        if (qConf == null) {
             return null;
         }
-        return (Integer)conf.get(_qConf);
+        return (Integer)conf.get(qConf);
     }
 
     public int getNumThreads(Map<String, Object> conf) {
-        if (_isFake) {
+        if (isFake) {
             return 1;
         }
-        return ObjectReader.getInt(conf.get(_threadsConf));
+        return ObjectReader.getInt(conf.get(threadsConf));
     }
 
     public int getMaxBufferSize(Map<String, Object> conf) {
-        if (_isFake) {
+        if (isFake) {
             return 1;
         }
-        return ObjectReader.getInt(conf.get(_buffConf));
+        return ObjectReader.getInt(conf.get(buffConf));
     }
 
     public Integer getSocketTimeOut(Map<String, Object> conf) {
-        if (_socketTimeoutConf == null) {
+        if (socketTimeoutConf == null) {
             return null;
         }
-        return ObjectReader.getInt(conf.get(_socketTimeoutConf));
+        return ObjectReader.getInt(conf.get(socketTimeoutConf));
+    }
+
+    /**
+     * Get the corresponding worker token type for this thrift connection.
+     */
+    public WorkerTokenServiceType getWtType() {
+        return wtType;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
index 8fa8a7e..608278d 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
@@ -15,13 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.security.auth;
 
 import java.io.IOException;
 import java.util.Map;
-
 import javax.security.auth.login.Configuration;
-
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TTransportException;
@@ -30,30 +29,32 @@ import org.slf4j.LoggerFactory;
 
 public class ThriftServer {
     private static final Logger LOG = 
LoggerFactory.getLogger(ThriftServer.class);
-    private Map _topoConf; //storm configuration
-    protected TProcessor _processor = null;
-    private final ThriftConnectionType _type;
-    private TServer _server;
-    private Configuration _login_conf;
-    private int _port;
+    private final Map<String, Object> conf; //storm configuration
+    protected final TProcessor processor;
+    private final ThriftConnectionType type;
+    private TServer server;
+    private Configuration loginConf;
+    private int port;
+    private boolean areWorkerTokensSupported;
     
-    public ThriftServer(Map<String, Object> topoConf, TProcessor processor, 
ThriftConnectionType type) {
-        _topoConf = topoConf;
-        _processor = processor;
-        _type = type;
+    public ThriftServer(Map<String, Object> conf, TProcessor processor, 
ThriftConnectionType type) {
+        this.conf = conf;
+        this.processor = processor;
+        this.type = type;
 
         try {
             //retrieve authentication configuration 
-            _login_conf = AuthUtils.GetConfiguration(_topoConf);
+            loginConf = AuthUtils.GetConfiguration(this.conf);
         } catch (Exception x) {
             LOG.error(x.getMessage(), x);
         }
         try {
             //locate our thrift transport plugin
-            ITransportPlugin transportPlugin = 
AuthUtils.GetTransportPlugin(_type, _topoConf, _login_conf);
+            ITransportPlugin transportPlugin = 
AuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
             //server
-            _server = transportPlugin.getServer(_processor);
-            _port = transportPlugin.getPort();
+            server = transportPlugin.getServer(this.processor);
+            port = transportPlugin.getPort();
+            areWorkerTokensSupported = 
transportPlugin.areWorkerTokensSupported();
         } catch (IOException | TTransportException ex) {
             handleServerException(ex);
         }
@@ -61,20 +62,20 @@ public class ThriftServer {
     }
 
     public void stop() {
-        _server.stop();
+        server.stop();
     }
 
     /**
      * @return true if ThriftServer is listening to requests?
      */
     public boolean isServing() {
-        return _server.isServing();
+        return server.isServing();
     }
     
     public void serve()  {
         try {
             //start accepting requests
-            _server.serve();
+            server.serve();
         } catch (Exception ex) {
             handleServerException(ex);
         }
@@ -82,8 +83,8 @@ public class ThriftServer {
     
     private void handleServerException(Exception ex) {
         LOG.error("ThriftServer is being stopped due to: " + ex, ex);
-        if (_server != null) {
-            _server.stop();
+        if (server != null) {
+            server.stop();
         }
         Runtime.getRuntime().halt(1); //shutdown server process since we could 
not handle Thrift requests any more
     }
@@ -92,6 +93,14 @@ public class ThriftServer {
      * @return The port this server is/will be listening on
      */
     public int getPort() {
-        return _port;
+        return port;
+    }
+
+    /**
+     * Check if worker tokens are supported by this thrift server.
+     * @return true if they are else false.
+     */
+    public boolean supportsWorkerTokens() {
+        return areWorkerTokensSupported;
     }
 }

Reply via email to