http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/Schema.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/main/java/org/apache/streams/util/schema/Schema.java 
b/streams-util/src/main/java/org/apache/streams/util/schema/Schema.java
index 795bf98..a9517c1 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/Schema.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/Schema.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -26,49 +27,69 @@ import java.net.URI;
  */
 public class Schema {
 
-    private final URI id;
-    private final URI uri;
-    private final JsonNode content;
-    private final Schema parent;
-    private final boolean generate;
+  private final URI id;
+  private final URI uri;
+  private final JsonNode content;
+  private final Schema parent;
+  private final boolean generate;
 
-    public Schema(URI uri, JsonNode content, Schema parent, boolean generate) {
-        this.uri = uri;
-        this.content = content;
-        this.parent = parent;
-        this.generate = generate;
-        this.id = content.has("id") ? URI.create(content.get("id").asText()) : 
null;
-    }
+  /**
+   * Schema constructor.
+   * @param uri uri
+   * @param content JsonNode content
+   * @param parent Schema parent
+   * @param generate whether to generate
+   */
+  public Schema(URI uri, JsonNode content, Schema parent, boolean generate) {
+    this.uri = uri;
+    this.content = content;
+    this.parent = parent;
+    this.generate = generate;
+    this.id = content.has("id") ? URI.create(content.get("id").asText()) : 
null;
+  }
 
-    public URI getId() {
-        return id;
-    }
+  public URI getId() {
+    return id;
+  }
 
-    public URI getURI() {
-        return uri;
-    }
+  public URI getUri() {
+    return uri;
+  }
 
-    public JsonNode getContent() {
-        return content;
-    }
+  public JsonNode getContent() {
+    return content;
+  }
 
-    public JsonNode getParentContent() {
-        if( parent != null )
-            return parent.getContent();
-        else return null;
+  /**
+   * getParentContent.
+   * @return Parent.Content
+   */
+  public JsonNode getParentContent() {
+    if ( parent != null ) {
+      return parent.getContent();
+    } else {
+      return null;
     }
+  }
 
-    public URI getParentURI() {
-        if( parent != null ) return parent.getURI();
-        else return null;
+  /**
+   * getParentUri.
+   * @return Parent.Uri
+   */
+  public URI getParentUri() {
+    if ( parent != null ) {
+      return parent.getUri();
+    } else {
+      return null;
     }
+  }
 
-    public boolean isGenerated() {
-        return generate;
-    }
+  public boolean isGenerated() {
+    return generate;
+  }
 
-    public Schema getParent() {
-        return parent;
-    }
+  public Schema getParent() {
+    return parent;
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStore.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStore.java 
b/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStore.java
index 779df41..4fca239 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStore.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStore.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -27,33 +28,34 @@ import java.util.Iterator;
 /**
  * A SchemaStore resolves and indexes json schemas and makes their properties 
available.
  *
+ * <p/>
  * Implementations include
  * - SchemaStoreImpl
  */
 public interface SchemaStore extends Comparator<Schema> {
-    
-    Schema create(URI uri);
 
-    Schema create(Schema parent, String path);
+  Schema create(URI uri);
+
+  Schema create(Schema parent, String path);
 
-    void clearCache();
+  void clearCache();
 
-    Integer getSize();
+  Integer getSize();
 
-    Optional<Schema> getById(URI id);
+  Optional<Schema> getById(URI id);
 
-    Optional<Schema> getByUri(URI uri);
+  Optional<Schema> getByUri(URI uri);
 
-    Integer getFileUriCount();
+  Integer getFileUriCount();
 
-    Integer getHttpUriCount();
+  Integer getHttpUriCount();
 
-    Iterator<Schema> getSchemaIterator();
+  Iterator<Schema> getSchemaIterator();
 
-    ObjectNode resolveProperties(Schema schema, ObjectNode fieldNode, String 
resourceId);
+  ObjectNode resolveProperties(Schema schema, ObjectNode fieldNode, String 
resourceId);
 
-    ObjectNode resolveItems(Schema schema, ObjectNode fieldNode, String 
resourceId);
+  ObjectNode resolveItems(Schema schema, ObjectNode fieldNode, String 
resourceId);
 
-    @Override
-    int compare(Schema left, Schema right);
+  @Override
+  int compare(Schema left, Schema right);
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStoreImpl.java
 
b/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStoreImpl.java
index 7126c82..e99380d 100644
--- 
a/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStoreImpl.java
+++ 
b/streams-util/src/main/java/org/apache/streams/util/schema/SchemaStoreImpl.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -36,329 +37,364 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.streams.util.schema.URIUtil.safeResolve;
+import static org.apache.streams.util.schema.UriUtil.safeResolve;
 
 /**
- * Created by steve on 4/30/16.
+ * Default Implementation of SchemaStore.
  */
 public class SchemaStoreImpl extends Ordering<Schema> implements SchemaStore {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SchemaStore.class);
-    private final static JsonNodeFactory NODE_FACTORY = 
JsonNodeFactory.instance;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaStore.class);
+  private static final JsonNodeFactory NODE_FACTORY = JsonNodeFactory.instance;
 
-    protected Map<URI, Schema> schemas = new HashMap();
-    protected FragmentResolver fragmentResolver = new FragmentResolver();
-    protected ContentResolver contentResolver = new ContentResolver();
+  protected Map<URI, Schema> schemas = new HashMap();
+  protected FragmentResolver fragmentResolver = new FragmentResolver();
+  protected ContentResolver contentResolver = new ContentResolver();
 
-    public SchemaStoreImpl() {
-    }
+  public SchemaStoreImpl() {
+  }
 
-    @Override
-    public synchronized Schema create(URI uri) {
-        if(!getByUri(uri).isPresent()) {
-            URI baseURI = URIUtil.removeFragment(uri);
-            JsonNode baseNode = this.contentResolver.resolve(baseURI);
-            if(uri.toString().contains("#") && !uri.toString().endsWith("#")) {
-                Schema newSchema = new Schema(baseURI, baseNode, null, true);
-                this.schemas.put(baseURI, newSchema);
-                JsonNode childContent = 
this.fragmentResolver.resolve(baseNode, '#' + 
StringUtils.substringAfter(uri.toString(), "#"));
-                this.schemas.put(uri, new Schema(uri, childContent, newSchema, 
false));
-            } else {
-                if( baseNode.has("extends") && 
baseNode.get("extends").isObject()) {
-                    URI ref = 
URI.create(((ObjectNode)baseNode.get("extends")).get("$ref").asText());
-                    URI absoluteURI;
-                    if( ref.isAbsolute())
-                        absoluteURI = ref;
-                    else
-                        absoluteURI = baseURI.resolve(ref);
-                    JsonNode parentNode = 
this.contentResolver.resolve(absoluteURI);
-                    Schema parentSchema = null;
-                    if( this.schemas.get(absoluteURI) != null ) {
-                        parentSchema = this.schemas.get(absoluteURI);
-                    } else {
-                        parentSchema = create(absoluteURI);
-                    }
-                    this.schemas.put(uri, new Schema(uri, baseNode, 
parentSchema, true));
-                } else {
-                    this.schemas.put(uri, new Schema(uri, baseNode, null, 
true));
-                }
-            }
-            List<JsonNode> refs = baseNode.findValues("$ref");
-            for( JsonNode ref : refs ) {
-                if( ref.isValueNode() ) {
-                    String refVal = ref.asText();
-                    URI refURI = null;
-                    try {
-                        refURI = URI.create(refVal);
-                    } catch( Exception e ) {
-                        LOGGER.info("Exception: {}", e.getMessage());
-                    }
-                    if (refURI != null && !getByUri(refURI).isPresent()) {
-                        if (refURI.isAbsolute())
-                            create(refURI);
-                        else
-                            create(baseURI.resolve(refURI));
-                    }
-                }
-            }
-        }
-
-        return this.schemas.get(uri);
-    }
-
-    @Override
-    public Schema create(Schema parent, String path) {
-        if(path.equals("#")) {
-            return parent;
+  @Override
+  public synchronized Schema create(URI uri) {
+    if (!getByUri(uri).isPresent()) {
+      URI baseUri = UriUtil.removeFragment(uri);
+      JsonNode baseNode = this.contentResolver.resolve(baseUri);
+      if (uri.toString().contains("#") && !uri.toString().endsWith("#")) {
+        Schema newSchema = new Schema(baseUri, baseNode, null, true);
+        this.schemas.put(baseUri, newSchema);
+        JsonNode childContent = this.fragmentResolver.resolve(baseNode, '#' + 
StringUtils.substringAfter(uri.toString(), "#"));
+        this.schemas.put(uri, new Schema(uri, childContent, newSchema, false));
+      } else {
+        if ( baseNode.has("extends") && baseNode.get("extends").isObject()) {
+          URI ref = 
URI.create(((ObjectNode)baseNode.get("extends")).get("$ref").asText());
+          URI absoluteUri;
+          if ( ref.isAbsolute()) {
+            absoluteUri = ref;
+          } else {
+            absoluteUri = baseUri.resolve(ref);
+          }
+          JsonNode parentNode = this.contentResolver.resolve(absoluteUri);
+          Schema parentSchema = null;
+          if ( this.schemas.get(absoluteUri) != null ) {
+            parentSchema = this.schemas.get(absoluteUri);
+          } else {
+            parentSchema = create(absoluteUri);
+          }
+          this.schemas.put(uri, new Schema(uri, baseNode, parentSchema, true));
         } else {
-            path = StringUtils.stripEnd(path, "#?&/");
-            URI id = parent != null && parent.getId() != 
null?parent.getId().resolve(path):URI.create(path);
-            if(this.selfReferenceWithoutParentFile(parent, path)) {
-                this.schemas.put(id, new Schema(id, 
this.fragmentResolver.resolve(parent.getParentContent(), path), parent, false));
-                return this.schemas.get(id);
+          this.schemas.put(uri, new Schema(uri, baseNode, null, true));
+        }
+      }
+      List<JsonNode> refs = baseNode.findValues("$ref");
+      for ( JsonNode ref : refs ) {
+        if ( ref.isValueNode() ) {
+          String refVal = ref.asText();
+          URI refUri = null;
+          try {
+            refUri = URI.create(refVal);
+          } catch ( Exception ex ) {
+            LOGGER.info("Exception: {}", ex.getMessage());
+          }
+          if (refUri != null && !getByUri(refUri).isPresent()) {
+            if (refUri.isAbsolute()) {
+              create(refUri);
             } else {
-                return this.create(id);
+              create(baseUri.resolve(refUri));
             }
+          }
         }
+      }
     }
 
-    protected boolean selfReferenceWithoutParentFile(Schema parent, String 
path) {
-        return parent != null && (parent.getId() == null || 
parent.getId().toString().startsWith("#/")) && path.startsWith("#/");
-    }
+    return this.schemas.get(uri);
+  }
 
-    @Override
-    public synchronized void clearCache() {
-        this.schemas.clear();
+  @Override
+  public Schema create(Schema parent, String path) {
+    if (path.equals("#")) {
+      return parent;
+    } else {
+      path = StringUtils.stripEnd(path, "#?&/");
+      URI id = (parent != null && parent.getId() != null)
+          ? parent.getId().resolve(path)
+          : URI.create(path);
+      if (this.selfReferenceWithoutParentFile(parent, path)) {
+        this.schemas.put(id, new Schema(id, 
this.fragmentResolver.resolve(parent.getParentContent(), path), parent, false));
+        return this.schemas.get(id);
+      } else {
+        return this.create(id);
+      }
     }
+  }
 
-    @Override
-    public Integer getSize() {
-        return schemas.size();
-    }
+  protected boolean selfReferenceWithoutParentFile(Schema parent, String path) 
{
+    return parent != null && (parent.getId() == null || 
parent.getId().toString().startsWith("#/")) && path.startsWith("#/");
+  }
 
-    @Override
-    public Optional<Schema> getById(URI id) {
-        for( Schema schema : schemas.values() ) {
-            if( schema.getId() != null && schema.getId().equals(id) )
-                return Optional.of(schema);
-        }
-        return Optional.absent();
-    }
+  @Override
+  public synchronized void clearCache() {
+    this.schemas.clear();
+  }
 
-    @Override
-    public Optional<Schema> getByUri(URI uri) {
-        for( Schema schema : schemas.values() ) {
-            if( schema.getURI().equals(uri) )
-                return Optional.of(schema);
-        }
-        return Optional.absent();
-    }
+  @Override
+  public Integer getSize() {
+    return schemas.size();
+  }
 
-    @Override
-    public Integer getFileUriCount() {
-        int count = 0;
-        for( Schema schema : schemas.values() ) {
-            if( schema.getURI().getScheme().equals("file") )
-                count++;
-        }
-        return count;
+  @Override
+  public Optional<Schema> getById(URI id) {
+    for ( Schema schema : schemas.values() ) {
+      if ( schema.getId() != null && schema.getId().equals(id) ) {
+        return Optional.of(schema);
+      }
     }
+    return Optional.absent();
+  }
 
-    @Override
-    public Integer getHttpUriCount() {
-        int count = 0;
-        for( Schema schema : schemas.values() ) {
-            if( schema.getURI().getScheme().equals("http") )
-                count++;
-        }
-        return count;
+  @Override
+  public Optional<Schema> getByUri(URI uri) {
+    for ( Schema schema : schemas.values() ) {
+      if ( schema.getUri().equals(uri) ) {
+        return Optional.of(schema);
+      }
     }
+    return Optional.absent();
+  }
 
-    @Override
-    public Iterator<Schema> getSchemaIterator() {
-        List<Schema> schemaList = Lists.newArrayList(schemas.values());
-        Collections.sort(schemaList, this);
-        return schemaList.iterator();
+  @Override
+  public Integer getFileUriCount() {
+    int count = 0;
+    for ( Schema schema : schemas.values() ) {
+      if ( schema.getUri().getScheme().equals("file") ) {
+        count++;
+      }
     }
+    return count;
+  }
 
-    @Override
-    public ObjectNode resolveProperties(Schema schema, ObjectNode fieldNode, 
String resourceId) {
-        // this should return something more suitable like:
-        //   Map<String, Pair<Schema, ObjectNode>>
-        ObjectNode schemaProperties = NODE_FACTORY.objectNode();
-        ObjectNode parentProperties = NODE_FACTORY.objectNode();
-        if (fieldNode == null) {
-            ObjectNode schemaContent = (ObjectNode) schema.getContent();
-            if( schemaContent.has("properties") ) {
-                schemaProperties = (ObjectNode) 
schemaContent.get("properties");
-                if (schema.getParentContent() != null) {
-                    ObjectNode parentContent = (ObjectNode) 
schema.getParentContent();
-                    if (parentContent.has("properties")) {
-                        parentProperties = (ObjectNode) 
parentContent.get("properties");
-                    }
-                }
-            }
-        } else if (fieldNode != null && fieldNode.size() > 0) {
-            if( fieldNode.has("properties") && 
fieldNode.get("properties").isObject() && fieldNode.get("properties").size() > 
0 )
-                schemaProperties = (ObjectNode) fieldNode.get("properties");
-            URI parentURI = null;
-            if( fieldNode.has("$ref") || fieldNode.has("extends") ) {
-                JsonNode refNode = fieldNode.get("$ref");
-                JsonNode extendsNode = fieldNode.get("extends");
-                if (refNode != null && refNode.isValueNode())
-                    parentURI = URI.create(refNode.asText());
-                else if (extendsNode != null && extendsNode.isObject())
-                    parentURI = URI.create(extendsNode.get("$ref").asText());
-                ObjectNode parentContent = null;
-                URI absoluteURI;
-                if (parentURI.isAbsolute())
-                    absoluteURI = parentURI;
-                else {
-                    absoluteURI = schema.getURI().resolve(parentURI);
-                    if (!absoluteURI.isAbsolute() || (absoluteURI.isAbsolute() 
&& !getByUri(absoluteURI).isPresent() ))
-                        absoluteURI = schema.getParentURI().resolve(parentURI);
-                }
-                if (absoluteURI != null && absoluteURI.isAbsolute()) {
-                    if (getByUri(absoluteURI).isPresent())
-                        parentContent = (ObjectNode) 
getByUri(absoluteURI).get().getContent();
-                    if (parentContent != null && parentContent.isObject() && 
parentContent.has("properties")) {
-                        parentProperties = (ObjectNode) 
parentContent.get("properties");
-                    } else if (absoluteURI.getPath().endsWith("#properties")) {
-                        absoluteURI = 
URI.create(absoluteURI.toString().replace("#properties", ""));
-                        parentProperties = (ObjectNode) 
getByUri(absoluteURI).get().getContent().get("properties");
-                    }
-                }
-            }
+  @Override
+  public Integer getHttpUriCount() {
+    int count = 0;
+    for ( Schema schema : schemas.values() ) {
+      if ( schema.getUri().getScheme().equals("http") ) {
+        count++;
+      }
+    }
+    return count;
+  }
 
+  @Override
+  public Iterator<Schema> getSchemaIterator() {
+    List<Schema> schemaList = Lists.newArrayList(schemas.values());
+    Collections.sort(schemaList, this);
+    return schemaList.iterator();
+  }
 
+  @Override
+  public ObjectNode resolveProperties(Schema schema, ObjectNode fieldNode, 
String resourceId) {
+    // this should return something more suitable like:
+    //   Map<String, Pair<Schema, ObjectNode>>
+    ObjectNode schemaProperties = NODE_FACTORY.objectNode();
+    ObjectNode parentProperties = NODE_FACTORY.objectNode();
+    if (fieldNode == null) {
+      ObjectNode schemaContent = (ObjectNode) schema.getContent();
+      if (schemaContent.has("properties")) {
+        schemaProperties = (ObjectNode) schemaContent.get("properties");
+        if (schema.getParentContent() != null) {
+          ObjectNode parentContent = (ObjectNode) schema.getParentContent();
+          if (parentContent.has("properties")) {
+            parentProperties = (ObjectNode) parentContent.get("properties");
+          }
+        }
+      }
+    } else if (fieldNode != null && fieldNode.size() > 0) {
+      if (fieldNode.has("properties") && 
fieldNode.get("properties").isObject() && fieldNode.get("properties").size() > 
0) {
+        schemaProperties = (ObjectNode) fieldNode.get("properties");
+      }
+      URI parentUri = null;
+      if ( fieldNode.has("$ref") || fieldNode.has("extends") ) {
+        JsonNode refNode = fieldNode.get("$ref");
+        JsonNode extendsNode = fieldNode.get("extends");
+        if (refNode != null && refNode.isValueNode()) {
+          parentUri = URI.create(refNode.asText());
+        } else if (extendsNode != null && extendsNode.isObject()) {
+          parentUri = URI.create(extendsNode.get("$ref").asText());
         }
+        ObjectNode parentContent = null;
+        URI absoluteUri;
+        if (parentUri.isAbsolute()) {
+          absoluteUri = parentUri;
+        } else {
+          absoluteUri = schema.getUri().resolve(parentUri);
+          if (!absoluteUri.isAbsolute() || (absoluteUri.isAbsolute() && 
!getByUri(absoluteUri).isPresent() )) {
+            absoluteUri = schema.getParentUri().resolve(parentUri);
+          }
+        }
+        if (absoluteUri != null && absoluteUri.isAbsolute()) {
+          if (getByUri(absoluteUri).isPresent()) {
+            parentContent = (ObjectNode) 
getByUri(absoluteUri).get().getContent();
+          }
+          if (parentContent != null && parentContent.isObject() && 
parentContent.has("properties")) {
+            parentProperties = (ObjectNode) parentContent.get("properties");
+          } else if (absoluteUri.getPath().endsWith("#properties")) {
+            absoluteUri = 
URI.create(absoluteUri.toString().replace("#properties", ""));
+            parentProperties = (ObjectNode) 
getByUri(absoluteUri).get().getContent().get("properties");
+          }
+        }
+      }
 
-        ObjectNode resolvedProperties = NODE_FACTORY.objectNode();
-        if (parentProperties != null && parentProperties.size() > 0)
-            resolvedProperties = SchemaUtil.mergeProperties(schemaProperties, 
parentProperties);
-        else resolvedProperties = schemaProperties.deepCopy();
 
-        return resolvedProperties;
     }
 
-    public ObjectNode resolveItems(Schema schema, ObjectNode fieldNode, String 
resourceId) {
-        ObjectNode schemaItems = NODE_FACTORY.objectNode();
-        ObjectNode parentItems = NODE_FACTORY.objectNode();
-        if (fieldNode == null) {
-            ObjectNode schemaContent = (ObjectNode) schema.getContent();
-            if( schemaContent.has("items") ) {
-                schemaItems = (ObjectNode) schemaContent.get("items");
-                if (schema.getParentContent() != null) {
-                    ObjectNode parentContent = (ObjectNode) 
schema.getParentContent();
-                    if (parentContent.has("items")) {
-                        parentItems = (ObjectNode) parentContent.get("items");
-                    }
-                }
-            }
-        } else if (fieldNode != null && fieldNode.size() > 0) {
-            if (fieldNode.has("items") && fieldNode.get("items").isObject() && 
fieldNode.get("items").size() > 0)
-                schemaItems = (ObjectNode) fieldNode.get("items");
-            URI parentURI = null;
-            if( fieldNode.has("$ref") || fieldNode.has("extends") ) {
-                JsonNode refNode = fieldNode.get("$ref");
-                JsonNode extendsNode = fieldNode.get("extends");
-                if (refNode != null && refNode.isValueNode())
-                    parentURI = URI.create(refNode.asText());
-                else if (extendsNode != null && extendsNode.isObject())
-                    parentURI = URI.create(extendsNode.get("$ref").asText());
-                ObjectNode parentContent = null;
-                URI absoluteURI;
-                if (parentURI.isAbsolute())
-                    absoluteURI = parentURI;
-                else {
-                    absoluteURI = schema.getURI().resolve(parentURI);
-                    if (!absoluteURI.isAbsolute() || (absoluteURI.isAbsolute() 
&& !getByUri(absoluteURI).isPresent() ))
-                        absoluteURI = schema.getParentURI().resolve(parentURI);
-                }
-                if (absoluteURI != null && absoluteURI.isAbsolute()) {
-                    if (getByUri(absoluteURI).isPresent())
-                        parentContent = (ObjectNode) 
getByUri(absoluteURI).get().getContent();
-                    if (parentContent != null && parentContent.isObject() && 
parentContent.has("items")) {
-                        parentItems = (ObjectNode) parentContent.get("items");
-                    } else if (absoluteURI.getPath().endsWith("#items")) {
-                        absoluteURI = 
URI.create(absoluteURI.toString().replace("#items", ""));
-                        parentItems = (ObjectNode) 
getByUri(absoluteURI).get().getContent().get("items");
-                    }
-                }
-            }
-        }
-
-        ObjectNode resolvedItems = NODE_FACTORY.objectNode();
-        if (parentItems != null && parentItems.size() > 0)
-            resolvedItems = SchemaUtil.mergeProperties(schemaItems, 
parentItems);
-        else resolvedItems = schemaItems.deepCopy();
-
-        return resolvedItems;
+    ObjectNode resolvedProperties = NODE_FACTORY.objectNode();
+    if (parentProperties != null && parentProperties.size() > 0) {
+      resolvedProperties = SchemaUtil.mergeProperties(schemaProperties, 
parentProperties);
+    } else {
+      resolvedProperties = schemaProperties.deepCopy();
     }
 
-    @Override
-    public int compare(Schema left, Schema right) {
-        // are they the same?
-        if( left.equals(right)) return 0;
-        // is one an ancestor of the other
-        Schema candidateAncestor = left;
-        while( candidateAncestor.getParent() != null ) {
-            candidateAncestor = candidateAncestor.getParent();
-            if( candidateAncestor.equals(right))
-                return 1;
+    return resolvedProperties;
+  }
+
+  /**
+   * resolve full definition of 'items'.
+   * @param schema Schema
+   * @param fieldNode ObjectNode
+   * @param resourceId resourceId
+   * @return ObjectNode
+   */
+  public ObjectNode resolveItems(Schema schema, ObjectNode fieldNode, String 
resourceId) {
+    ObjectNode schemaItems = NODE_FACTORY.objectNode();
+    ObjectNode parentItems = NODE_FACTORY.objectNode();
+    if (fieldNode == null) {
+      ObjectNode schemaContent = (ObjectNode) schema.getContent();
+      if ( schemaContent.has("items") ) {
+        schemaItems = (ObjectNode) schemaContent.get("items");
+        if (schema.getParentContent() != null) {
+          ObjectNode parentContent = (ObjectNode) schema.getParentContent();
+          if (parentContent.has("items")) {
+            parentItems = (ObjectNode) parentContent.get("items");
+          }
         }
-        candidateAncestor = right;
-        while( candidateAncestor.getParent() != null ) {
-            candidateAncestor = candidateAncestor.getParent();
-            if( candidateAncestor.equals(left))
-                return -1;
+      }
+    } else if (fieldNode != null && fieldNode.size() > 0) {
+      if (fieldNode.has("items") && fieldNode.get("items").isObject() && 
fieldNode.get("items").size() > 0) {
+        schemaItems = (ObjectNode) fieldNode.get("items");
+      }
+      URI parentUri = null;
+      if ( fieldNode.has("$ref") || fieldNode.has("extends") ) {
+        JsonNode refNode = fieldNode.get("$ref");
+        JsonNode extendsNode = fieldNode.get("extends");
+        if (refNode != null && refNode.isValueNode()) {
+          parentUri = URI.create(refNode.asText());
+        } else if (extendsNode != null && extendsNode.isObject()) {
+          parentUri = URI.create(extendsNode.get("$ref").asText());
         }
-        // does one have a field that reference the other?
-        for( JsonNode refNode : left.getContent().findValues("$ref") ) {
-            String refText = refNode.asText();
-            Optional<URI> resolvedURI = safeResolve(left.getURI(), refText);
-            if( resolvedURI.isPresent() && 
resolvedURI.get().equals(right.getURI()))
-                return 1;
+        ObjectNode parentContent = null;
+        URI absoluteUri;
+        if (parentUri.isAbsolute()) {
+          absoluteUri = parentUri;
+        } else {
+          absoluteUri = schema.getUri().resolve(parentUri);
+          if (!absoluteUri.isAbsolute() || (absoluteUri.isAbsolute() && 
!getByUri(absoluteUri).isPresent() )) {
+            absoluteUri = schema.getParentUri().resolve(parentUri);
+          }
         }
-        for( JsonNode refNode : right.getContent().findValues("$ref") ) {
-            String refText = refNode.asText();
-            Optional<URI> resolvedURI = safeResolve(right.getURI(), refText);
-            if( resolvedURI.isPresent() && 
resolvedURI.get().equals(left.getURI()))
-                return -1;
+        if (absoluteUri != null && absoluteUri.isAbsolute()) {
+          if (getByUri(absoluteUri).isPresent()) {
+            parentContent = (ObjectNode) 
getByUri(absoluteUri).get().getContent();
+          }
+          if (parentContent != null && parentContent.isObject() && 
parentContent.has("items")) {
+            parentItems = (ObjectNode) parentContent.get("items");
+          } else if (absoluteUri.getPath().endsWith("#items")) {
+            absoluteUri = URI.create(absoluteUri.toString().replace("#items", 
""));
+            parentItems = (ObjectNode) 
getByUri(absoluteUri).get().getContent().get("items");
+          }
         }
-        // does one have a field that reference a third schema that references 
the other?
-        for( JsonNode refNode : left.getContent().findValues("$ref") ) {
-            String refText = refNode.asText();
-            Optional<URI> possibleConnectorURI = safeResolve(left.getURI(), 
refText);
-            if( possibleConnectorURI.isPresent()) {
-                Optional<Schema> possibleConnector = 
getByUri(possibleConnectorURI.get());
-                if (possibleConnector.isPresent()) {
-                    for (JsonNode connectorRefNode : 
possibleConnector.get().getContent().findValues("$ref")) {
-                        String connectorRefText = connectorRefNode.asText();
-                        Optional<URI> resolvedURI = 
safeResolve(possibleConnector.get().getURI(), connectorRefText);
-                        if (resolvedURI.isPresent() && 
resolvedURI.get().equals(right.getURI()))
-                            return 1;
-                    }
-                }
+      }
+    }
+
+    ObjectNode resolvedItems = NODE_FACTORY.objectNode();
+    if (parentItems != null && parentItems.size() > 0) {
+      resolvedItems = SchemaUtil.mergeProperties(schemaItems, parentItems);
+    } else {
+      resolvedItems = schemaItems.deepCopy();
+    }
+
+    return resolvedItems;
+  }
+
+  @Override
+  public int compare(Schema left, Schema right) {
+    // are they the same?
+    if ( left.equals(right)) {
+      return 0;
+    }
+    // is one an ancestor of the other
+    Schema candidateAncestor = left;
+    while ( candidateAncestor.getParent() != null ) {
+      candidateAncestor = candidateAncestor.getParent();
+      if ( candidateAncestor.equals(right)) {
+        return 1;
+      }
+    }
+    candidateAncestor = right;
+    while ( candidateAncestor.getParent() != null ) {
+      candidateAncestor = candidateAncestor.getParent();
+      if ( candidateAncestor.equals(left)) {
+        return -1;
+      }
+    }
+    // does one have a field that reference the other?
+    for ( JsonNode refNode : left.getContent().findValues("$ref") ) {
+      String refText = refNode.asText();
+      Optional<URI> resolvedUri = safeResolve(left.getUri(), refText);
+      if ( resolvedUri.isPresent() && 
resolvedUri.get().equals(right.getUri())) {
+        return 1;
+      }
+    }
+    for ( JsonNode refNode : right.getContent().findValues("$ref") ) {
+      String refText = refNode.asText();
+      Optional<URI> resolvedUri = safeResolve(right.getUri(), refText);
+      if ( resolvedUri.isPresent() && resolvedUri.get().equals(left.getUri())) 
{
+        return -1;
+      }
+    }
+    // does one have a field that reference a third schema that references the 
other?
+    for ( JsonNode refNode : left.getContent().findValues("$ref") ) {
+      String refText = refNode.asText();
+      Optional<URI> possibleConnectorUri = safeResolve(left.getUri(), refText);
+      if ( possibleConnectorUri.isPresent()) {
+        Optional<Schema> possibleConnector = 
getByUri(possibleConnectorUri.get());
+        if (possibleConnector.isPresent()) {
+          for (JsonNode connectorRefNode : 
possibleConnector.get().getContent().findValues("$ref")) {
+            String connectorRefText = connectorRefNode.asText();
+            Optional<URI> resolvedUri = 
safeResolve(possibleConnector.get().getUri(), connectorRefText);
+            if (resolvedUri.isPresent() && 
resolvedUri.get().equals(right.getUri())) {
+              return 1;
             }
+          }
         }
-        for( JsonNode refNode : right.getContent().findValues("$ref") ) {
-            String refText = refNode.asText();
-            Optional<URI> possibleConnectorURI = safeResolve(right.getURI(), 
refText);
-            if( possibleConnectorURI.isPresent()) {
-                Optional<Schema> possibleConnector = 
getByUri(possibleConnectorURI.get());
-                if (possibleConnector.isPresent()) {
-                    for (JsonNode connectorRefNode : 
possibleConnector.get().getContent().findValues("$ref")) {
-                        String connectorRefText = connectorRefNode.asText();
-                        Optional<URI> resolvedURI = 
safeResolve(possibleConnector.get().getURI(), connectorRefText);
-                        if (resolvedURI.isPresent() && 
resolvedURI.get().equals(left.getURI()))
-                            return -1;
-                    }
-                }
+      }
+    }
+    for ( JsonNode refNode : right.getContent().findValues("$ref") ) {
+      String refText = refNode.asText();
+      Optional<URI> possibleConnectorUri = safeResolve(right.getUri(), 
refText);
+      if ( possibleConnectorUri.isPresent()) {
+        Optional<Schema> possibleConnector = 
getByUri(possibleConnectorUri.get());
+        if (possibleConnector.isPresent()) {
+          for (JsonNode connectorRefNode : 
possibleConnector.get().getContent().findValues("$ref")) {
+            String connectorRefText = connectorRefNode.asText();
+            Optional<URI> resolvedUri = 
safeResolve(possibleConnector.get().getUri(), connectorRefText);
+            if (resolvedUri.isPresent() && 
resolvedUri.get().equals(left.getUri())) {
+              return -1;
             }
+          }
         }
-        return 0;
+      }
     }
+    return 0;
+  }
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/SchemaUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/main/java/org/apache/streams/util/schema/SchemaUtil.java 
b/streams-util/src/main/java/org/apache/streams/util/schema/SchemaUtil.java
index 785ec58..e4b7928 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/SchemaUtil.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/SchemaUtil.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -34,33 +35,44 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
  */
 public class SchemaUtil {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SchemaUtil.class);
-    private static final JsonNodeFactory NODE_FACTORY = 
JsonNodeFactory.instance;
-    public static final String ILLEGAL_CHARACTER_REGEX = "[^0-9a-zA-Z_$]";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaUtil.class);
+  private static final JsonNodeFactory NODE_FACTORY = JsonNodeFactory.instance;
+  public static final String ILLEGAL_CHARACTER_REGEX = "[^0-9a-zA-Z_$]";
 
-    public static String childQualifiedName(String parentQualifiedName, String 
childSimpleName) {
-        String safeChildName = 
childSimpleName.replaceAll(ILLEGAL_CHARACTER_REGEX, "_");
-        return isEmpty(parentQualifiedName) ? safeChildName : 
parentQualifiedName + "." + safeChildName;
-    }
+  public static String childQualifiedName(String parentQualifiedName, String 
childSimpleName) {
+    String safeChildName = childSimpleName.replaceAll(ILLEGAL_CHARACTER_REGEX, 
"_");
+    return isEmpty(parentQualifiedName) ? safeChildName : parentQualifiedName 
+ "." + safeChildName;
+  }
 
-    public static ObjectNode readSchema(URL schemaUrl) {
+  /**
+   * read Schema from URL.
+   * @param schemaUrl URL
+   * @return ObjectNode
+   */
+  public static ObjectNode readSchema(URL schemaUrl) {
 
-        ObjectNode schemaNode = NODE_FACTORY.objectNode();
-        schemaNode.put("$ref", schemaUrl.toString());
-        return schemaNode;
+    ObjectNode schemaNode = NODE_FACTORY.objectNode();
+    schemaNode.put("$ref", schemaUrl.toString());
+    return schemaNode;
 
-    }
+  }
 
-    public static ObjectNode mergeProperties(ObjectNode content, ObjectNode 
parent) {
+  /**
+   * merge parent and child properties maps.
+   * @param content ObjectNode
+   * @param parent ObjectNode
+   * @return merged ObjectNode
+   */
+  public static ObjectNode mergeProperties(ObjectNode content, ObjectNode 
parent) {
 
-        ObjectNode merged = parent.deepCopy();
-        Iterator<Map.Entry<String, JsonNode>> fields = content.fields();
-        for( ; fields.hasNext(); ) {
-            Map.Entry<String, JsonNode> field = fields.next();
-            String fieldId = field.getKey();
-            merged.put(fieldId, field.getValue().deepCopy());
-        }
-        return merged;
+    ObjectNode merged = parent.deepCopy();
+    Iterator<Map.Entry<String, JsonNode>> fields = content.fields();
+    for ( ; fields.hasNext(); ) {
+      Map.Entry<String, JsonNode> field = fields.next();
+      String fieldId = field.getKey();
+      merged.put(fieldId, field.getValue().deepCopy());
     }
+    return merged;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/URIUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/main/java/org/apache/streams/util/schema/URIUtil.java 
b/streams-util/src/main/java/org/apache/streams/util/schema/URIUtil.java
deleted file mode 100644
index d645675..0000000
--- a/streams-util/src/main/java/org/apache/streams/util/schema/URIUtil.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.util.schema;
-
-import com.google.common.base.Optional;
-import org.apache.commons.lang3.StringUtils;
-
-import java.net.URI;
-
-/**
- * URIUtil contains methods to assist in resolving URIs and URI fragments.
- */
-public class URIUtil {
-
-    public static URI removeFragment(URI id) {
-        return URI.create(StringUtils.substringBefore(id.toString(), "#"));
-    }
-
-    public static URI removeFile(URI id) {
-        return URI.create(StringUtils.substringBeforeLast(id.toString(), "/"));
-    }
-
-    public static Optional<URI> safeResolve(URI absolute, String relativePart) 
{
-        if( !absolute.isAbsolute()) return Optional.absent();
-        try {
-            return Optional.of(absolute.resolve(relativePart));
-        } catch( IllegalArgumentException e ) {
-            return Optional.absent();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/UriUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/main/java/org/apache/streams/util/schema/UriUtil.java 
b/streams-util/src/main/java/org/apache/streams/util/schema/UriUtil.java
new file mode 100644
index 0000000..33b656d
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/UriUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.util.schema;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URI;
+
+/**
+ * UriUtil contains methods to assist in resolving URIs and URI fragments.
+ */
+public class UriUtil {
+
+  public static URI removeFragment(URI id) {
+    return URI.create(StringUtils.substringBefore(id.toString(), "#"));
+  }
+
+  public static URI removeFile(URI id) {
+    return URI.create(StringUtils.substringBeforeLast(id.toString(), "/"));
+  }
+
+  /**
+   * resolve a remote schema safely.
+   * @param absolute root URI
+   * @param relativePart relative to root
+   * @return URI if resolvable, or Optional.absent()
+   */
+  public static Optional<URI> safeResolve(URI absolute, String relativePart) {
+    if ( !absolute.isAbsolute()) {
+      return Optional.absent();
+    }
+    try {
+      return Optional.of(absolute.resolve(relativePart));
+    } catch ( IllegalArgumentException ex ) {
+      return Optional.absent();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
 
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
index 108813e..00380e6 100644
--- 
a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
+++ 
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
@@ -24,64 +24,63 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Unit Tets
+ * Unit Test for BackOffStrategy.
  */
 public class BackOffStrategyTest {
 
+  private class TestBackOff extends AbstractBackOffStrategy {
 
-    private class TestBackOff extends AbstractBackOffStrategy {
-
-        public TestBackOff(long sleep, int maxAttempts) {
-            super(sleep, maxAttempts);
-        }
-
-        @Override
-        protected long calculateBackOffTime(int attemptCount, long 
baseSleepTime) {
-            return baseSleepTime;
-        }
+    public TestBackOff(long sleep, int maxAttempts) {
+      super(sleep, maxAttempts);
     }
 
-    @Test
-    public void testUnlimitedBackOff() {
-        AbstractBackOffStrategy backOff = new TestBackOff(1, -1);
-        try {
-            for(int i=0; i < 100; ++i) {
-                backOff.backOff();
-            }
-        } catch (BackOffException boe) {
-            fail("Threw BackOffException.  Not expected action");
-        }
+    @Override
+    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+      return baseSleepTime;
     }
+  }
 
-    @Test
-    public void testLimitedUseBackOff()  {
-        AbstractBackOffStrategy backOff = new TestBackOff(1, 2);
-        try {
-            backOff.backOff();
-        } catch (BackOffException boe) {
-            fail("Threw BackOffExpection. Not expected action");
-        }
-        try {
-            backOff.backOff();
-        } catch (BackOffException boe) {
-            fail("Threw BackOffExpection. Not expected action");
-        }
-        try {
-            backOff.backOff();
-            fail("Expected BackOffException to be thrown.");
-        } catch (BackOffException boe) {
-
-        }
+  @Test
+  public void testUnlimitedBackOff() {
+    AbstractBackOffStrategy backOff = new TestBackOff(1, -1);
+    try {
+      for (int i = 0; i < 100; ++i) {
+        backOff.backOff();
+      }
+    } catch (BackOffException boe) {
+      fail("Threw BackOffException.  Not expected action");
     }
+  }
 
-    @Test
-    public void testBackOffSleep() throws BackOffException {
-        AbstractBackOffStrategy backOff = new TestBackOff(2000, 1);
-        long startTime = System.currentTimeMillis();
-        backOff.backOff();
-        long endTime = System.currentTimeMillis();
-        assertTrue(endTime - startTime >= 2000);
+  @Test
+  public void testLimitedUseBackOff()  {
+    AbstractBackOffStrategy backOff = new TestBackOff(1, 2);
+    try {
+      backOff.backOff();
+    } catch (BackOffException boe) {
+      fail("Threw BackOffExpection. Not expected action");
+    }
+    try {
+      backOff.backOff();
+    } catch (BackOffException boe) {
+      fail("Threw BackOffExpection. Not expected action");
     }
+    try {
+      backOff.backOff();
+      fail("Expected BackOffException to be thrown.");
+    } catch (BackOffException boe) {
+      //
+    }
+  }
+
+  @Test
+  public void testBackOffSleep() throws BackOffException {
+    AbstractBackOffStrategy backOff = new TestBackOff(2000, 1);
+    long startTime = System.currentTimeMillis();
+    backOff.backOff();
+    long endTime = System.currentTimeMillis();
+    assertTrue(endTime - startTime >= 2000);
+  }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
 
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
index 0eedaa0..e10a7e2 100644
--- 
a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
+++ 
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
@@ -18,27 +18,25 @@
 
 package org.apache.streams.util.api.requests.backoff;
 
-import com.carrotsearch.randomizedtesting.RandomizedTest;
 import 
org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
-import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import org.junit.Test;
 
 /**
- * Unit Tests
+ * Unit Test for BackOffStrategy.
  */
-public class ConstantTimeBackOffStrategyTest extends RandomizedTest{
-
-
-    @Test
-    public void constantTimeBackOffStategy() {
-        AbstractBackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
-        assertEquals(1, backOff.calculateBackOffTime(1,1));
-        assertEquals(1, backOff.calculateBackOffTime(2,1));
-        assertEquals(1, backOff.calculateBackOffTime(3,1));
-        assertEquals(1, backOff.calculateBackOffTime(4,1));
-        assertEquals(1, backOff.calculateBackOffTime(randomIntBetween(1, 
Integer.MAX_VALUE),1));
-    }
+public class ConstantTimeBackOffStrategyTest extends RandomizedTest {
+
+  @Test
+  public void constantTimeBackOffStategy() {
+    AbstractBackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
+    assertEquals(1, backOff.calculateBackOffTime(1,1));
+    assertEquals(1, backOff.calculateBackOffTime(2,1));
+    assertEquals(1, backOff.calculateBackOffTime(3,1));
+    assertEquals(1, backOff.calculateBackOffTime(4,1));
+    assertEquals(1, backOff.calculateBackOffTime(randomIntBetween(1, 
Integer.MAX_VALUE),1));
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
 
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
index d595254..70f25ec 100644
--- 
a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
+++ 
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
@@ -19,23 +19,24 @@
 package org.apache.streams.util.api.requests.backoff;
 
 import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
 /**
- * Unit Tests
+ * Unit Test for ExponentialBackOffStrategy.
  */
 public class ExponentialBackOffStrategyTest {
 
-    @Test
-    public void exponentialTimeBackOffStrategyTest() {
-        AbstractBackOffStrategy backOff = new ExponentialBackOffStrategy(1);
-        assertEquals(5000, backOff.calculateBackOffTime(1,5));
-        assertEquals(25000, backOff.calculateBackOffTime(2,5));
-        assertEquals(125000, backOff.calculateBackOffTime(3,5));
-        assertEquals(2000, backOff.calculateBackOffTime(1,2));
-        assertEquals(16000, backOff.calculateBackOffTime(4,2));
-    }
+  @Test
+  public void exponentialTimeBackOffStrategyTest() {
+    AbstractBackOffStrategy backOff = new ExponentialBackOffStrategy(1);
+    assertEquals(5000, backOff.calculateBackOffTime(1,5));
+    assertEquals(25000, backOff.calculateBackOffTime(2,5));
+    assertEquals(125000, backOff.calculateBackOffTime(3,5));
+    assertEquals(2000, backOff.calculateBackOffTime(1,2));
+    assertEquals(16000, backOff.calculateBackOffTime(4,2));
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
 
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
deleted file mode 100644
index 8b3f384..0000000
--- 
a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.util.api.requests.backoff;
-
-import 
org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Unit Tests
- */
-public class LinearTimeBackOffStartegyTest {
-
-    @Test
-    public void linearTimeBackOffStrategyTest() {
-        AbstractBackOffStrategy backOff = new LinearTimeBackOffStrategy(1);
-        assertEquals(1000, backOff.calculateBackOffTime(1,1));
-        assertEquals(2000, backOff.calculateBackOffTime(2,1));
-        assertEquals(3000, backOff.calculateBackOffTime(3,1));
-        assertEquals(4000, backOff.calculateBackOffTime(4,1));
-        assertEquals(25000, backOff.calculateBackOffTime(5,5));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStrategyTest.java
 
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStrategyTest.java
new file mode 100644
index 0000000..9477b64
--- /dev/null
+++ 
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStrategyTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.util.api.requests.backoff;
+
+import 
org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit Test for LinearTimeBackOffStrategy.
+ */
+public class LinearTimeBackOffStrategyTest {
+
+  @Test
+  public void linearTimeBackOffStrategyTest() {
+    AbstractBackOffStrategy backOff = new LinearTimeBackOffStrategy(1);
+    assertEquals(1000, backOff.calculateBackOffTime(1,1));
+    assertEquals(2000, backOff.calculateBackOffTime(2,1));
+    assertEquals(3000, backOff.calculateBackOffTime(3,1));
+    assertEquals(4000, backOff.calculateBackOffTime(4,1));
+    assertEquals(25000, backOff.calculateBackOffTime(5,5));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/test/java/org/apache/streams/util/files/StreamsScannerUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/files/StreamsScannerUtil.java
 
b/streams-util/src/test/java/org/apache/streams/util/files/StreamsScannerUtil.java
index 576cef0..97aafd7 100644
--- 
a/streams-util/src/test/java/org/apache/streams/util/files/StreamsScannerUtil.java
+++ 
b/streams-util/src/test/java/org/apache/streams/util/files/StreamsScannerUtil.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.util.files;
 
-import java.io.File;
 import java.io.InputStream;
 import java.util.Scanner;
 import java.util.regex.Pattern;
@@ -28,12 +27,17 @@ import java.util.regex.Pattern;
  */
 public class StreamsScannerUtil {
 
-    protected static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", 
Pattern.MULTILINE);
+  protected static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", 
Pattern.MULTILINE);
 
-    public static Scanner getInstance(String resourcePath) {
+  /**
+   * get instance of Scanner using resource path.
+   * @param resourcePath resourcePath
+   * @return Scanner
+   */
+  public static Scanner getInstance(String resourcePath) {
 
-        InputStream testFileStream = 
StreamsScannerUtil.class.getResourceAsStream(resourcePath);
-        return new Scanner(testFileStream, 
"UTF-8").useDelimiter(newLinePattern);
+    InputStream testFileStream = 
StreamsScannerUtil.class.getResourceAsStream(resourcePath);
+    return new Scanner(testFileStream, "UTF-8").useDelimiter(newLinePattern);
 
-    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
 
b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
index c08b68b..b799fce 100644
--- 
a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
+++ 
b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
@@ -19,7 +19,8 @@
 package org.apache.streams.util.oauth.tokens.tokenmanager;
 
 import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
-import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import 
org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -29,165 +30,171 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
- * Unit tests for BasticTokenManager
+ * Unit tests for BasicTokenManager.
  */
 public class TestBasicTokenManager {
 
-    /**
-     * Simple token for testing purposes
-     */
-    private class TestToken extends AbstractOauthToken {
-
-        private String s;
+  /**
+   * Simple token for testing purposes.
+   */
+  private class TestToken extends AbstractOauthToken {
 
-        public TestToken(String s) {
-            this.s = s;
-        }
+    private String token;
 
-        @Override
-        protected boolean internalEquals(Object o) {
-            if(!(o instanceof TestToken))
-                return false;
-            TestToken that = (TestToken) o;
-            return this.s.equals(that.s);
-        }
+    public TestToken(String token) {
+      this.token = token;
     }
 
-    @Test
-    public void testNoArgConstructor() {
-        try {
-            BasicTokenManger manager = new BasicTokenManger<TestToken>();
-            assertEquals(0, manager.numAvailableTokens());
-        } catch (Throwable t) {
-            fail("Constructors threw error: "+t.getMessage());
-        }
+    @Override
+    protected boolean internalEquals(Object otherToken) {
+      if (!(otherToken instanceof TestToken)) {
+        return false;
+      }
+      TestToken that = (TestToken) otherToken;
+      return this.token.equals(that.token);
     }
-
-    @Test
-    public void testCollectionConstructor() {
-        List<TestToken> tokens = new LinkedList<TestToken>();
-        try {
-            BasicTokenManger manager1 = new 
BasicTokenManger<TestToken>(tokens);
-            tokens.add(new TestToken("a"));
-            tokens.add(new TestToken("b"));
-            assertEquals(0, manager1.numAvailableTokens());
-            BasicTokenManger manager2 = new 
BasicTokenManger<TestToken>(tokens);
-            assertEquals(2, manager2.numAvailableTokens());
-            assertEquals(0, manager1.numAvailableTokens());
-        } catch (Throwable t) {
-            fail("Constructors threw error: "+t.getMessage());
-        }
+  }
+
+  @Test
+  public void testNoArgConstructor() {
+    try {
+      BasicTokenManager manager = new BasicTokenManager<TestToken>();
+      assertEquals(0, manager.numAvailableTokens());
+    } catch (Throwable throwable) {
+      fail("Constructors threw error: " + throwable.getMessage());
     }
-
-    @Test
-    public void testAddTokenToPool() {
-        BasicTokenManger<TestToken> manager = new 
BasicTokenManger<TestToken>();
-        assertTrue(manager.addTokenToPool(new TestToken("a")));
-        assertEquals(1, manager.numAvailableTokens());
-        assertFalse(manager.addTokenToPool(new TestToken("a")));
-        assertEquals(1, manager.numAvailableTokens());
-        assertTrue(manager.addTokenToPool(new TestToken("b")));
-        assertEquals(2, manager.numAvailableTokens());
+  }
+
+  @Test
+  public void testCollectionConstructor() {
+    List<TestToken> tokens = new LinkedList<TestToken>();
+    try {
+      BasicTokenManager manager1 = new BasicTokenManager<TestToken>(tokens);
+      tokens.add(new TestToken("a"));
+      tokens.add(new TestToken("b"));
+      assertEquals(0, manager1.numAvailableTokens());
+      BasicTokenManager manager2 = new BasicTokenManager<TestToken>(tokens);
+      assertEquals(2, manager2.numAvailableTokens());
+      assertEquals(0, manager1.numAvailableTokens());
+    } catch (Throwable throwable) {
+      fail("Constructors threw error: " + throwable.getMessage());
     }
-
-    @Test
-    public void testAddAllTokensToPool() {
-        BasicTokenManger<TestToken> manager = new 
BasicTokenManger<TestToken>();
-        List<TestToken> tokens = new ArrayList<TestToken>();
-        tokens.add(new TestToken("a"));
-        tokens.add(new TestToken("b"));
-        tokens.add(new TestToken("c"));
-        assertTrue(manager.addAllTokensToPool(tokens));
-        assertEquals(3, manager.numAvailableTokens());
-        assertFalse(manager.addAllTokensToPool(tokens));
-        assertEquals(3, manager.numAvailableTokens());
-        tokens.add(new TestToken("d"));
-        assertTrue(manager.addAllTokensToPool(tokens));
-        assertEquals(4, manager.numAvailableTokens());
+  }
+
+  @Test
+  public void testAddTokenToPool() {
+    BasicTokenManager<TestToken> manager = new BasicTokenManager<TestToken>();
+    assertTrue(manager.addTokenToPool(new TestToken("a")));
+    assertEquals(1, manager.numAvailableTokens());
+    assertFalse(manager.addTokenToPool(new TestToken("a")));
+    assertEquals(1, manager.numAvailableTokens());
+    assertTrue(manager.addTokenToPool(new TestToken("b")));
+    assertEquals(2, manager.numAvailableTokens());
+  }
+
+  @Test
+  public void testAddAllTokensToPool() {
+    List<TestToken> tokens = new ArrayList<TestToken>();
+    tokens.add(new TestToken("a"));
+    tokens.add(new TestToken("b"));
+    tokens.add(new TestToken("c"));
+    BasicTokenManager<TestToken> manager = new BasicTokenManager<TestToken>();
+    assertTrue(manager.addAllTokensToPool(tokens));
+    assertEquals(3, manager.numAvailableTokens());
+    assertFalse(manager.addAllTokensToPool(tokens));
+    assertEquals(3, manager.numAvailableTokens());
+    tokens.add(new TestToken("d"));
+    assertTrue(manager.addAllTokensToPool(tokens));
+    assertEquals(4, manager.numAvailableTokens());
+  }
+
+  @Test
+  public void testGetNextAvailableToken() {
+    BasicTokenManager manager = new BasicTokenManager<TestToken>();
+    assertNull(manager.getNextAvailableToken());
+    TestToken tokenA = new TestToken("a");
+    assertTrue(manager.addTokenToPool(tokenA));
+    assertEquals(tokenA, manager.getNextAvailableToken());
+    assertEquals(tokenA, manager.getNextAvailableToken());
+    assertEquals(tokenA, manager.getNextAvailableToken());
+
+    TestToken tokenB = new TestToken("b");
+    TestToken tokenC = new TestToken("c");
+    assertTrue(manager.addTokenToPool(tokenB));
+    assertTrue(manager.addTokenToPool(tokenC));
+    assertEquals(tokenA, manager.getNextAvailableToken());
+    assertEquals(tokenB, manager.getNextAvailableToken());
+    assertEquals(tokenC, manager.getNextAvailableToken());
+    assertEquals(tokenA, manager.getNextAvailableToken());
+    assertEquals(tokenB, manager.getNextAvailableToken());
+    assertEquals(tokenC, manager.getNextAvailableToken());
+  }
+
+  @Test
+  public void testMultiThreadSafety() {
+    int numThreads = 10;
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch finishLatch = new CountDownLatch(numThreads);
+    BasicTokenManager<TestToken> manager = new BasicTokenManager<TestToken>();
+    for (int i = 0; i < numThreads; ++i) {
+      assertTrue(manager.addTokenToPool(new TestToken(String.valueOf(i))));
     }
-
-    @Test
-    public void testGetNextAvailableToken() {
-        BasicTokenManger manager = new BasicTokenManger<TestToken>();
-        assertNull(manager.getNextAvailableToken());
-        TestToken tokenA = new TestToken("a");
-        assertTrue(manager.addTokenToPool(tokenA));
-        assertEquals(tokenA, manager.getNextAvailableToken());
-        assertEquals(tokenA, manager.getNextAvailableToken());
-        assertEquals(tokenA, manager.getNextAvailableToken());
-
-        TestToken tokenB = new TestToken("b");
-        TestToken tokenC = new TestToken("c");
-        assertTrue(manager.addTokenToPool(tokenB));
-        assertTrue(manager.addTokenToPool(tokenC));
-        assertEquals(tokenA, manager.getNextAvailableToken());
-        assertEquals(tokenB, manager.getNextAvailableToken());
-        assertEquals(tokenC, manager.getNextAvailableToken());
-        assertEquals(tokenA, manager.getNextAvailableToken());
-        assertEquals(tokenB, manager.getNextAvailableToken());
-        assertEquals(tokenC, manager.getNextAvailableToken());
+    for (int i = 0; i < numThreads; ++i) {
+      executor.submit(new TestThread(manager, startLatch, finishLatch, 
numThreads));
     }
-
-    @Test
-    public void testMultiThreadSafety() {
-        int numThreads = 10;
-        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
-        CountDownLatch startLatch = new CountDownLatch(1);
-        CountDownLatch finishLatch = new CountDownLatch(numThreads);
-        BasicTokenManger<TestToken> manager = new 
BasicTokenManger<TestToken>();
-        for(int i=0; i < numThreads; ++i) {
-            assertTrue(manager.addTokenToPool(new 
TestToken(String.valueOf(i))));
-        }
-        for(int i=0; i < numThreads; ++i) {
-            executor.submit(new TestThread(manager, startLatch, finishLatch, 
numThreads));
-        }
-        try {
-            Thread.sleep(2000); //sleep for 2 seconds so other threads can 
initialize
-            startLatch.countDown();
-            finishLatch.await();
-            assertTrue("No errors were thrown during thead safe check", true);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        } catch (Throwable t) {
-            fail("Error occured durring thread safe test : "+t.getMessage());
-        }
+    try {
+      Thread.sleep(2000); //sleep for 2 seconds so other threads can initialize
+      startLatch.countDown();
+      finishLatch.await();
+      assertTrue("No errors were thrown during thead safe check", true);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    } catch (Throwable throwable) {
+      fail("Error occured durring thread safe test : " + 
throwable.getMessage());
+    }
+  }
+
+  /**
+   * Test class for thread safe check.
+   */
+  private class TestThread implements Runnable {
+
+    private BasicTokenManager<TestToken> manager;
+    private CountDownLatch startLatch;
+    private CountDownLatch finishedLatch;
+    private int availableTokens;
+
+    public TestThread(BasicTokenManager<TestToken> manager, CountDownLatch 
startLatch, CountDownLatch finishedLatch, int availableTokens) {
+      this.manager = manager;
+      this.startLatch = startLatch;
+      this.finishedLatch = finishedLatch;
+      this.availableTokens = availableTokens;
     }
 
-    /**
-     * Test class for thread safe check.
-     */
-    private class TestThread implements Runnable {
-
-        private BasicTokenManger<TestToken> manager;
-        private CountDownLatch startLatch;
-        private CountDownLatch finishedLatch;
-        private int availableTokens;
-
-        public TestThread(BasicTokenManger<TestToken> manager, CountDownLatch 
startLatch, CountDownLatch finishedLatch, int availableTokens) {
-            this.manager = manager;
-            this.startLatch = startLatch;
-            this.finishedLatch = finishedLatch;
-            this.availableTokens = availableTokens;
-        }
-
-        @Override
-        public void run() {
-            try {
-                this.startLatch.await();
-                for(int i=0; i < 1000; ++i) {
-                    assertNotNull(this.manager.getNextAvailableToken());
-                    assertEquals(this.availableTokens, 
this.manager.numAvailableTokens());
-                }
-                this.finishedLatch.countDown();
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            } catch (Throwable t) {
-                fail("Threw error in multithread test : "+t.getMessage());
-            }
+    @Override
+    public void run() {
+      try {
+        this.startLatch.await();
+        for (int i = 0; i < 1000; ++i) {
+          assertNotNull(this.manager.getNextAvailableToken());
+          assertEquals(this.availableTokens, 
this.manager.numAvailableTokens());
         }
+        this.finishedLatch.countDown();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      } catch (Throwable throwable) {
+        fail("Threw error in multithread test : " + throwable.getMessage());
+      }
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaOrderingTest.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaOrderingTest.java
 
b/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaOrderingTest.java
index 1a72ff6..c155b67 100644
--- 
a/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaOrderingTest.java
+++ 
b/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaOrderingTest.java
@@ -15,15 +15,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema.test;
 
+import org.apache.streams.util.schema.Schema;
+import org.apache.streams.util.schema.SchemaStore;
+import org.apache.streams.util.schema.SchemaStoreImpl;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import org.apache.streams.util.schema.Schema;
-import org.apache.streams.util.schema.SchemaStore;
-import org.apache.streams.util.schema.SchemaStoreImpl;
 import org.junit.Test;
 
 import java.io.File;
@@ -35,133 +37,137 @@ import java.util.List;
  */
 public class SchemaOrderingTest {
 
-    @Test
-    public void compareVerbParent() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File update = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
-        schemaStore.create(update.toURI());
-        File activity = new 
File("target/test-classes/activitystreams-schemas/activity.json");
-        schemaStore.create(activity.toURI());
-        assert( schemaStore.compare( 
schemaStore.getByUri(update.toURI()).get(), 
schemaStore.getByUri(activity.toURI()).get()) == 1);
-        Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
-        assertContainsItemsEndingWithInOrder(
-                schemaIterator,
-                Lists.newArrayList(
-                        "activity.json",
-                        "update.json"
-                )
-        );
+  @Test
+  public void compareVerbParent() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File update = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
+    schemaStore.create(update.toURI());
+    File activity = new 
File("target/test-classes/activitystreams-schemas/activity.json");
+    schemaStore.create(activity.toURI());
+    assert ( schemaStore.compare( schemaStore.getByUri(update.toURI()).get(), 
schemaStore.getByUri(activity.toURI()).get()) == 1);
+    Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
+    assertContainsItemsEndingWithInOrder(
+        schemaIterator,
+        Lists.newArrayList(
+            "activity.json",
+            "update.json"
+        )
+    );
+  }
+
+  @Test
+  public void compareObjectTypeParent() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File alert = new 
File("target/test-classes/activitystreams-schemas/objectTypes/alert.json");
+    schemaStore.create(alert.toURI());
+    File object = new 
File("target/test-classes/activitystreams-schemas/object.json");
+    schemaStore.create(object.toURI());
+    assert ( schemaStore.compare( schemaStore.getByUri(object.toURI()).get(), 
schemaStore.getByUri(alert.toURI()).get()) == -1);
+    Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
+    assertContainsItemsEndingWithInOrder(
+        schemaIterator,
+        Lists.newArrayList(
+            "object.json",
+            "alert.json"
+        )
+    );
+  }
+
+  @Test
+  public void compareUnrelated() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File alert = new 
File("target/test-classes/activitystreams-schemas/objectTypes/alert.json");
+    schemaStore.create(alert.toURI());
+    File update = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
+    schemaStore.create(update.toURI());
+    assert ( schemaStore.compare( schemaStore.getByUri(alert.toURI()).get(), 
schemaStore.getByUri(update.toURI()).get()) == 0);
+  }
+
+  @Test
+  public void compareVerbFieldRef() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File update = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
+    schemaStore.create(update.toURI());
+    File object = new 
File("target/test-classes/activitystreams-schemas/object.json");
+    schemaStore.create(object.toURI());
+    assert ( schemaStore.compare( schemaStore.getByUri(update.toURI()).get(), 
schemaStore.getByUri(object.toURI()).get()) == 1);
+    Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
+    assertContainsItemsEndingWithInOrder(
+        schemaIterator,
+        Lists.newArrayList(
+            "object.json",
+            "update.json"
+        )
+    );
+  }
+
+  @Test
+  public void compareObjectTypeFieldRef() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File alert = new 
File("target/test-classes/activitystreams-schemas/objectTypes/alert.json");
+    schemaStore.create(alert.toURI());
+    File mediaLink = new 
File("target/test-classes/activitystreams-schemas/media_link.json");
+    schemaStore.create(mediaLink.toURI());
+    assert ( schemaStore.compare( 
schemaStore.getByUri(mediaLink.toURI()).get(), 
schemaStore.getByUri(alert.toURI()).get()) == -1);
+    Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
+    assertContainsItemsEndingWithInOrder(
+        schemaIterator,
+        Lists.newArrayList(
+            "media_link.json",
+            "object.json",
+            "alert.json"
+        )
+    );
+  }
+
+  @Test
+  public void compareVerbAncestorIndirect() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File update = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
+    schemaStore.create(update.toURI());
+    File mediaLink = new 
File("target/test-classes/activitystreams-schemas/media_link.json");
+    schemaStore.create(mediaLink.toURI());
+    assert ( schemaStore.getByUri(mediaLink.toURI()).isPresent());
+    assert ( schemaStore.getByUri(update.toURI()).isPresent());
+    assert ( schemaStore.compare( 
schemaStore.getByUri(mediaLink.toURI()).get(), 
schemaStore.getByUri(update.toURI()).get()) == -1);
+    Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
+    assertContainsItemsEndingWithInOrder(
+        schemaIterator,
+        Lists.newArrayList(
+            "media_link.json",
+            "update.json"
+        )
+    );
+  }
+
+  /**
+   * assert iterator of Schema contains URI items ending with in order.
+   * @param iterator Iterator of Schema
+   * @param items List of String
+   */
+  public void assertContainsItemsEndingWithInOrder(Iterator<Schema> iterator, 
List<String> items) {
+    for ( String item : items ) {
+      Optional<Schema> tryFind = Iterators.tryFind( iterator, new 
SchemaUriEndsWithPredicate(item) );
+      assert ( tryFind.isPresent() );
     }
+  }
 
-    @Test
-    public void compareObjectTypeParent() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File alert = new 
File("target/test-classes/activitystreams-schemas/objectTypes/alert.json");
-        schemaStore.create(alert.toURI());
-        File object = new 
File("target/test-classes/activitystreams-schemas/object.json");
-        schemaStore.create(object.toURI());
-        assert( schemaStore.compare( 
schemaStore.getByUri(object.toURI()).get(), 
schemaStore.getByUri(alert.toURI()).get()) == -1);
-        Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
-        assertContainsItemsEndingWithInOrder(
-                schemaIterator,
-                Lists.newArrayList(
-                        "object.json",
-                        "alert.json"
-                )
-        );
-    }
+  public class SchemaUriEndsWithPredicate implements Predicate<Schema> {
 
-    @Test
-    public void compareUnrelated() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File alert = new 
File("target/test-classes/activitystreams-schemas/objectTypes/alert.json");
-        schemaStore.create(alert.toURI());
-        File update = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
-        schemaStore.create(update.toURI());
-        assert( schemaStore.compare( 
schemaStore.getByUri(alert.toURI()).get(), 
schemaStore.getByUri(update.toURI()).get()) == 0);
-    }
+    private String endsWith;
 
-    @Test
-    public void compareVerbFieldRef() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File update = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
-        schemaStore.create(update.toURI());
-        File object = new 
File("target/test-classes/activitystreams-schemas/object.json");
-        schemaStore.create(object.toURI());
-        assert( schemaStore.compare( 
schemaStore.getByUri(update.toURI()).get(), 
schemaStore.getByUri(object.toURI()).get()) == 1);
-        Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
-        assertContainsItemsEndingWithInOrder(
-                schemaIterator,
-                Lists.newArrayList(
-                        "object.json",
-                        "update.json"
-                )
-        );
+    public SchemaUriEndsWithPredicate(String endsWith) {
+      this.endsWith = endsWith;
     }
 
-    @Test
-    public void compareObjectTypeFieldRef() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File alert = new 
File("target/test-classes/activitystreams-schemas/objectTypes/alert.json");
-        schemaStore.create(alert.toURI());
-        File media_link = new 
File("target/test-classes/activitystreams-schemas/media_link.json");
-        schemaStore.create(media_link.toURI());
-        assert( schemaStore.compare( 
schemaStore.getByUri(media_link.toURI()).get(), 
schemaStore.getByUri(alert.toURI()).get()) == -1);
-        Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
-        assertContainsItemsEndingWithInOrder(
-                schemaIterator,
-                Lists.newArrayList(
-                        "media_link.json",
-                        "object.json",
-                        "alert.json"
-                )
-        );
+    @Override
+    public boolean apply(Schema input) {
+      return input.getUri().getPath().endsWith(endsWith);
     }
 
-    @Test
-    public void compareVerbAncestorIndirect() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File update = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
-        schemaStore.create(update.toURI());
-        File media_link = new 
File("target/test-classes/activitystreams-schemas/media_link.json");
-        schemaStore.create(media_link.toURI());
-        assert( schemaStore.getByUri(media_link.toURI()).isPresent());
-        assert( schemaStore.getByUri(update.toURI()).isPresent());
-        assert( schemaStore.compare( 
schemaStore.getByUri(media_link.toURI()).get(), 
schemaStore.getByUri(update.toURI()).get()) == -1);
-        Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator();
-        assertContainsItemsEndingWithInOrder(
-                schemaIterator,
-                Lists.newArrayList(
-                        "media_link.json",
-                        "update.json"
-                )
-        );
-    }
-
-
-    public void assertContainsItemsEndingWithInOrder(Iterator<Schema> 
iterator, List<String> items) {
-        for( String item : items ) {
-            Optional<Schema> tryFind = Iterators.tryFind( iterator, new 
SchemaUriEndsWithPredicate(item) );
-            assert( tryFind.isPresent() );
-        }
-    }
-
-    public class SchemaUriEndsWithPredicate implements Predicate<Schema> {
-
-        private String endsWith;
-
-        public SchemaUriEndsWithPredicate(String endsWith) {
-            this.endsWith = endsWith;
-        }
-
-        @Override
-        public boolean apply(Schema input) {
-            return input.getURI().getPath().endsWith(endsWith);
-        }
-
-        @Override
-        public boolean equals(Object object) {
-            return false;
-        }
+    @Override
+    public boolean equals(Object object) {
+      return false;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaStoreTest.java
 
b/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaStoreTest.java
index 1dce654..6dad615 100644
--- 
a/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaStoreTest.java
+++ 
b/streams-util/src/test/java/org/apache/streams/util/schema/test/SchemaStoreTest.java
@@ -15,71 +15,72 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema.test;
 
 import org.apache.streams.util.schema.Schema;
 import org.apache.streams.util.schema.SchemaStore;
 import org.apache.streams.util.schema.SchemaStoreImpl;
+
 import org.junit.Test;
 
 import java.io.File;
-import java.net.URI;
 
 /**
  * Created by sblackmon on 5/2/16.
  */
 public class SchemaStoreTest {
 
-    @Test
-    public void indexMediaLink() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File file = new 
File("target/test-classes/activitystreams-schemas/media_link.json");
-        schemaStore.create(file.toURI());
-        assert( schemaStore.getFileUriCount() == 1);
-        assert( schemaStore.getByUri(file.toURI()).isPresent());
-        assert( 
schemaStore.getById(schemaStore.getByUri(file.toURI()).get().getId()).isPresent());
-    }
+  @Test
+  public void indexMediaLink() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File file = new 
File("target/test-classes/activitystreams-schemas/media_link.json");
+    schemaStore.create(file.toURI());
+    assert ( schemaStore.getFileUriCount() == 1);
+    assert ( schemaStore.getByUri(file.toURI()).isPresent());
+    assert ( 
schemaStore.getById(schemaStore.getByUri(file.toURI()).get().getId()).isPresent());
+  }
 
-    @Test
-    public void indexApprove() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File file = new 
File("target/test-classes/activitystreams-schemas/verbs/approve.json");
-        schemaStore.create(file.toURI());
-        assert( schemaStore.getFileUriCount() == 4);
-        assert( schemaStore.getByUri(file.toURI()).isPresent());
-        assert( 
schemaStore.getById(schemaStore.getByUri(file.toURI()).get().getId()).isPresent());
-    }
+  @Test
+  public void indexApprove() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File file = new 
File("target/test-classes/activitystreams-schemas/verbs/approve.json");
+    schemaStore.create(file.toURI());
+    assert ( schemaStore.getFileUriCount() == 4);
+    assert ( schemaStore.getByUri(file.toURI()).isPresent());
+    assert ( 
schemaStore.getById(schemaStore.getByUri(file.toURI()).get().getId()).isPresent());
+  }
 
-    @Test
-    public void indexCollection() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File file = new 
File("target/test-classes/activitystreams-schemas/collection.json");
-        schemaStore.create(file.toURI());
-        assert( schemaStore.getFileUriCount() == 3);
-        assert( schemaStore.getByUri(file.toURI()).isPresent());
-        assert( 
schemaStore.getById(schemaStore.getByUri(file.toURI()).get().getId()).isPresent());
-        Schema collection = schemaStore.getByUri(file.toURI()).get();
-        assert( collection.getParent() == null );
-    }
+  @Test
+  public void indexCollection() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File file = new 
File("target/test-classes/activitystreams-schemas/collection.json");
+    schemaStore.create(file.toURI());
+    assert ( schemaStore.getFileUriCount() == 3);
+    assert ( schemaStore.getByUri(file.toURI()).isPresent());
+    assert ( 
schemaStore.getById(schemaStore.getByUri(file.toURI()).get().getId()).isPresent());
+    Schema collection = schemaStore.getByUri(file.toURI()).get();
+    assert ( collection.getParent() == null );
+  }
 
-    @Test
-    public void indexUpdate() {
-        SchemaStore schemaStore = new SchemaStoreImpl();
-        File file = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
-        schemaStore.create(file.toURI());
-        assert( schemaStore.getFileUriCount() == 4);
-        assert( schemaStore.getByUri(file.toURI()).isPresent());
-        assert( 
schemaStore.getById(schemaStore.getByUri(file.toURI()).get().getId()).isPresent());
-        Schema update = schemaStore.getByUri(file.toURI()).get();
-        assert( update.getParent() != null );
-        File parentFile = new 
File("target/test-classes/activitystreams-schemas/activity.json");
-        Schema parent = schemaStore.getByUri(parentFile.toURI()).get();
-        assert( parent != null );
-        assert( update.getParentURI().equals(parent.getURI()));
-    }
+  @Test
+  public void indexUpdate() {
+    SchemaStore schemaStore = new SchemaStoreImpl();
+    File file = new 
File("target/test-classes/activitystreams-schemas/verbs/update.json");
+    schemaStore.create(file.toURI());
+    assert ( schemaStore.getFileUriCount() == 4);
+    assert ( schemaStore.getByUri(file.toURI()).isPresent());
+    assert ( 
schemaStore.getById(schemaStore.getByUri(file.toURI()).get().getId()).isPresent());
+    Schema update = schemaStore.getByUri(file.toURI()).get();
+    assert ( update.getParent() != null );
+    File parentFile = new 
File("target/test-classes/activitystreams-schemas/activity.json");
+    Schema parent = schemaStore.getByUri(parentFile.toURI()).get();
+    assert ( parent != null );
+    assert ( update.getParentUri().equals(parent.getUri()));
+  }
 
-    // test create from messed up URI
+  // test create from messed up URI
 
-    // test create from URI with messed up reference
+  // test create from URI with messed up reference
 
 }


Reply via email to