http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGenerator.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGenerator.java
 
b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGenerator.java
index 7f8eb25..7afa9fc 100644
--- 
a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGenerator.java
+++ 
b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGenerator.java
@@ -16,8 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.plugins.cassandra;
 
+import org.apache.streams.util.schema.FieldType;
+import org.apache.streams.util.schema.FieldUtil;
+import org.apache.streams.util.schema.GenerationConfig;
+import org.apache.streams.util.schema.Schema;
+import org.apache.streams.util.schema.SchemaStore;
+import org.apache.streams.util.schema.SchemaStoreImpl;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Joiner;
@@ -25,12 +33,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import org.apache.streams.util.schema.FieldType;
-import org.apache.streams.util.schema.FieldUtil;
-import org.apache.streams.util.schema.GenerationConfig;
-import org.apache.streams.util.schema.Schema;
-import org.apache.streams.util.schema.SchemaStore;
-import org.apache.streams.util.schema.SchemaStoreImpl;
 import org.jsonschema2pojo.util.URLUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,341 +46,369 @@ import java.util.List;
 import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.commons.lang3.StringUtils.defaultString;
 import static org.apache.streams.util.schema.FileUtil.dropExtension;
 import static org.apache.streams.util.schema.FileUtil.dropSourcePathPrefix;
 import static org.apache.streams.util.schema.FileUtil.resolveRecursive;
 import static org.apache.streams.util.schema.FileUtil.writeFile;
 
 /**
- * Created by sblackmon on 5/3/16.
+ * Resource Generator for Cassandra.
  */
 public class StreamsCassandraResourceGenerator implements Runnable {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsCassandraResourceGenerator.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsCassandraResourceGenerator.class);
 
-    private final static String LS = System.getProperty("line.separator");
+  private static final String LS = System.getProperty("line.separator");
 
-    private StreamsCassandraGenerationConfig config;
+  private StreamsCassandraGenerationConfig config;
 
-    private SchemaStore schemaStore = new SchemaStoreImpl();
+  private SchemaStore schemaStore = new SchemaStoreImpl();
 
-    private int currentDepth = 0;
+  private int currentDepth = 0;
 
-    public static void main(String[] args) {
-        StreamsCassandraGenerationConfig config = new 
StreamsCassandraGenerationConfig();
+  /**
+   * Run from CLI without Maven
+   *
+   * <p/>
+   * java -jar streams-plugin-cassandra-jar-with-dependencies.jar 
StreamsCassandraResourceGenerator src/main/jsonschema target/generated-resources
+   *
+   * @param args [sourceDirectory, targetDirectory]
+   */
+  public static void main(String[] args) {
+    StreamsCassandraGenerationConfig config = new 
StreamsCassandraGenerationConfig();
 
-        String sourceDirectory = "./src/main/jsonschema";
-        String targetDirectory = "./target/generated-resources/cassandra";
+    String sourceDirectory = "./src/main/jsonschema";
+    String targetDirectory = "./target/generated-resources/cassandra";
 
-        if( args.length > 0 )
-            sourceDirectory = args[0];
-        if( args.length > 1 )
-            targetDirectory = args[1];
-
-        config.setSourceDirectory(sourceDirectory);
-        config.setTargetDirectory(targetDirectory);
-
-        StreamsCassandraResourceGenerator streamsCassandraResourceGenerator = 
new StreamsCassandraResourceGenerator(config);
-        streamsCassandraResourceGenerator.run();
+    if ( args.length > 0 ) {
+      sourceDirectory = args[0];
     }
-
-    public StreamsCassandraResourceGenerator(StreamsCassandraGenerationConfig 
config) {
-        this.config = config;
+    if ( args.length > 1 ) {
+      targetDirectory = args[1];
     }
 
-    public void run() {
+    config.setSourceDirectory(sourceDirectory);
+    config.setTargetDirectory(targetDirectory);
 
-        checkNotNull(config);
+    StreamsCassandraResourceGenerator streamsCassandraResourceGenerator = new 
StreamsCassandraResourceGenerator(config);
+    streamsCassandraResourceGenerator.run();
+  }
 
-        generate(config);
+  public StreamsCassandraResourceGenerator(StreamsCassandraGenerationConfig 
config) {
+    this.config = config;
+  }
 
-    }
+  @Override
+  public void run() {
 
-    public void generate(StreamsCassandraGenerationConfig config) {
-
-        LinkedList<File> sourceFiles = new LinkedList<File>();
-
-        for (Iterator<URL> sources = config.getSource(); sources.hasNext();) {
-            URL source = sources.next();
-            sourceFiles.add(URLUtil.getFileFromURL(source));
-        }
+    checkNotNull(config);
 
-        LOGGER.info("Seeded with {} source paths:", sourceFiles.size());
+    generate(config);
 
-        resolveRecursive((GenerationConfig)config, sourceFiles);
+  }
 
-        LOGGER.info("Resolved {} schema files:", sourceFiles.size());
+  /**
+   * run generate using supplied StreamsCassandraGenerationConfig.
+   * @param config StreamsCassandraGenerationConfig
+   */
+  public void generate(StreamsCassandraGenerationConfig config) {
 
-        for (Iterator<File> iterator = sourceFiles.iterator(); 
iterator.hasNext();) {
-            File item = iterator.next();
-            schemaStore.create(item.toURI());
-        }
+    LinkedList<File> sourceFiles = new LinkedList<File>();
 
-        LOGGER.info("Identified {} objects:", schemaStore.getSize());
+    for (Iterator<URL> sources = config.getSource(); sources.hasNext();) {
+      URL source = sources.next();
+      sourceFiles.add(URLUtil.getFileFromURL(source));
+    }
 
-        String outputFile = config.getTargetDirectory() + "/" + "types.cql";
-        StringBuilder typesContent = new StringBuilder();
+    LOGGER.info("Seeded with {} source paths:", sourceFiles.size());
 
-        for (Iterator<Schema> schemaIterator = 
schemaStore.getSchemaIterator(); schemaIterator.hasNext(); ) {
-            Schema schema = schemaIterator.next();
-            currentDepth = 0;
-            if( schema.getURI().getScheme().equals("file")) {
-                String inputFile = schema.getURI().getPath();
-                String resourcePath = dropSourcePathPrefix(inputFile, 
config.getSourceDirectory());
-                for (String sourcePath : config.getSourcePaths()) {
-                    resourcePath = dropSourcePathPrefix(resourcePath, 
sourcePath);
-                }
+    resolveRecursive((GenerationConfig)config, sourceFiles);
 
-                String resourceId = schemaSymbol(schema);
+    LOGGER.info("Resolved {} schema files:", sourceFiles.size());
 
-                LOGGER.info("Processing {}", resourcePath);
+    for (Iterator<File> iterator = sourceFiles.iterator(); 
iterator.hasNext();) {
+      File item = iterator.next();
+      schemaStore.create(item.toURI());
+    }
 
-                String resourceContent = generateResource(schema, resourceId);
+    LOGGER.info("Identified {} objects:", schemaStore.getSize());
 
-                typesContent.append(resourceContent);
+    String outputFile = config.getTargetDirectory() + "/" + "types.cql";
+    StringBuilder typesContent = new StringBuilder();
 
-                LOGGER.info("Added {}", resourceId);
-            }
+    for (Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator(); 
schemaIterator.hasNext(); ) {
+      Schema schema = schemaIterator.next();
+      currentDepth = 0;
+      if ( schema.getUri().getScheme().equals("file")) {
+        String inputFile = schema.getUri().getPath();
+        String resourcePath = dropSourcePathPrefix(inputFile, 
config.getSourceDirectory());
+        for (String sourcePath : config.getSourcePaths()) {
+          resourcePath = dropSourcePathPrefix(resourcePath, sourcePath);
         }
 
-        writeFile(outputFile, typesContent.toString());
-
-    }
+        String resourceId = schemaSymbol(schema);
 
-    public String generateResource(Schema schema, String resourceId) {
-        StringBuilder resourceBuilder = new StringBuilder();
-        resourceBuilder.append("CREATE TYPE ");
-        resourceBuilder.append(resourceId);
-        resourceBuilder.append(" IF NOT EXISTS (");
-        resourceBuilder.append(LS);
-        resourceBuilder = appendRootObject(resourceBuilder, schema, 
resourceId, ' ');
-        resourceBuilder.append(");");
-        resourceBuilder.append(LS);
-        return resourceBuilder.toString();
-    }
+        LOGGER.info("Processing {}", resourcePath);
 
-    public StringBuilder appendRootObject(StringBuilder builder, Schema 
schema, String resourceId, Character seperator) {
-        ObjectNode propertiesNode = schemaStore.resolveProperties(schema, 
null, resourceId);
-        if( propertiesNode.get("id") != null ) {
-            builder.append("id text PRIMARY KEY,");
-            builder.append(LS);
-            propertiesNode.remove("id");
-        }
-        if( propertiesNode != null && propertiesNode.isObject() && 
propertiesNode.size() > 0) {
-            builder = appendPropertiesNode(builder, schema, propertiesNode, 
seperator);
-        }
-        return builder;
-    }
+        String resourceContent = generateResource(schema, resourceId);
 
-    private StringBuilder appendValueField(StringBuilder builder, Schema 
schema, String fieldId, FieldType fieldType, Character seperator) {
-        // safe to append nothing
-        checkNotNull(builder);
-        builder.append(cqlEscape(fieldId));
-        builder.append(seperator);
-        builder.append(cqlType(fieldType));
-        return builder;
-    }
+        typesContent.append(resourceContent);
 
-    public StringBuilder appendArrayItems(StringBuilder builder, Schema 
schema, String fieldId, ObjectNode itemsNode, Character seperator) {
-        // not safe to append nothing
-        checkNotNull(builder);
-        if( itemsNode == null ) return builder;
-        if( itemsNode.has("type")) {
-            try {
-                FieldType itemType = FieldUtil.determineFieldType(itemsNode);
-                switch( itemType ) {
-                    case OBJECT:
-                        Schema objectSchema = null;
-                        URI parentURI = null;
-                        if( itemsNode.has("$ref") || itemsNode.has("extends") 
) {
-                            JsonNode refNode = itemsNode.get("$ref");
-                            JsonNode extendsNode = itemsNode.get("extends");
-                            if (refNode != null && refNode.isValueNode())
-                                parentURI = URI.create(refNode.asText());
-                            else if (extendsNode != null && 
extendsNode.isObject())
-                                parentURI = 
URI.create(extendsNode.get("$ref").asText());
-                            URI absoluteURI;
-                            if (parentURI.isAbsolute())
-                                absoluteURI = parentURI;
-                            else {
-                                absoluteURI = 
schema.getURI().resolve(parentURI);
-                                if (!absoluteURI.isAbsolute() || 
(absoluteURI.isAbsolute() && !schemaStore.getByUri(absoluteURI).isPresent() ))
-                                    absoluteURI = 
schema.getParentURI().resolve(parentURI);
-                            }
-                            if (absoluteURI != null && 
absoluteURI.isAbsolute()) {
-                                Optional<Schema> schemaLookup = 
schemaStore.getByUri(absoluteURI);
-                                if (schemaLookup.isPresent()) {
-                                    objectSchema = schemaLookup.get();
-                                }
-                            }
-                        }
-                        // have to resolve schema here
-
-                        builder = appendArrayObject(builder, objectSchema, 
fieldId, seperator);
-                        break;
-                    case ARRAY:
-                        ObjectNode subArrayItems = (ObjectNode) 
itemsNode.get("items");
-                        builder = appendArrayItems(builder, schema, fieldId, 
subArrayItems, seperator);
-                        break;
-                    default:
-                        builder = appendArrayField(builder, schema, fieldId, 
itemType, seperator);
-                }
-            } catch (Exception e) {
-                LOGGER.warn("No item type resolvable for {}", fieldId);
-            }
-        }
-        checkNotNull(builder);
-        return builder;
+        LOGGER.info("Added {}", resourceId);
+      }
     }
 
-    private StringBuilder appendArrayField(StringBuilder builder, Schema 
schema, String fieldId, FieldType fieldType, Character seperator) {
-        // safe to append nothing
-        checkNotNull(builder);
-        checkNotNull(fieldId);
-        builder.append(cqlEscape(fieldId));
-        builder.append(seperator);
-        builder.append("list<"+cqlType(fieldType)+">");
-        checkNotNull(builder);
-        return builder;
+    writeFile(outputFile, typesContent.toString());
+
+  }
+
+  /**
+   * generateResource String from schema and resourceId.
+   * @param schema Schema
+   * @param resourceId String
+   * @return CREATE TYPE ...
+   */
+  public String generateResource(Schema schema, String resourceId) {
+    StringBuilder resourceBuilder = new StringBuilder();
+    resourceBuilder.append("CREATE TYPE ");
+    resourceBuilder.append(resourceId);
+    resourceBuilder.append(" IF NOT EXISTS (");
+    resourceBuilder.append(LS);
+    resourceBuilder = appendRootObject(resourceBuilder, schema, resourceId, ' 
');
+    resourceBuilder.append(");");
+    resourceBuilder.append(LS);
+    return resourceBuilder.toString();
+  }
+
+  protected StringBuilder appendRootObject(StringBuilder builder, Schema 
schema, String resourceId, Character seperator) {
+    ObjectNode propertiesNode = schemaStore.resolveProperties(schema, null, 
resourceId);
+    if ( propertiesNode.get("id") != null ) {
+      builder.append("id text PRIMARY KEY,");
+      builder.append(LS);
+      propertiesNode.remove("id");
     }
-
-    private StringBuilder appendArrayObject(StringBuilder builder, Schema 
schema, String fieldId, Character seperator) {
-        // safe to append nothing
-        checkNotNull(builder);
-        String schemaSymbol = schemaSymbol(schema);
-        if( !Strings.isNullOrEmpty(fieldId) && schemaSymbol != null ) {
-            builder.append(cqlEscape(fieldId));
-            builder.append(seperator);
-            builder.append("list<" + schemaSymbol + ">");
-            builder.append(LS);
-        }
-        checkNotNull(builder);
-        return builder;
+    if ( propertiesNode != null && propertiesNode.isObject() && 
propertiesNode.size() > 0) {
+      builder = appendPropertiesNode(builder, schema, propertiesNode, 
seperator);
     }
-
-    private StringBuilder appendSchemaField(StringBuilder builder, Schema 
schema, String fieldId, Character seperator) {
-        // safe to append nothing
-        checkNotNull(builder);
-        String schemaSymbol = schemaSymbol(schema);
-        if( !Strings.isNullOrEmpty(fieldId) && schemaSymbol != null ) {
-            builder.append(cqlEscape(fieldId));
-            builder.append(seperator);
-            builder.append(schemaSymbol);
-        }
-        checkNotNull(builder);
-        return builder;
+    return builder;
+  }
+
+  private StringBuilder appendValueField(StringBuilder builder, Schema schema, 
String fieldId, FieldType fieldType, Character seperator) {
+    // safe to append nothing
+    checkNotNull(builder);
+    builder.append(cqlEscape(fieldId));
+    builder.append(seperator);
+    builder.append(cqlType(fieldType));
+    return builder;
+  }
+
+  protected StringBuilder appendArrayItems(StringBuilder builder, Schema 
schema, String fieldId, ObjectNode itemsNode, Character seperator) {
+    // not safe to append nothing
+    checkNotNull(builder);
+    if ( itemsNode == null ) {
+      return builder;
     }
-
-    /*
-     can this be moved to streams-schemas if schemastore available in scope?
-     maybe an interface?
-     lot of boilerplate / reuse between plugins
-     however treatment is way different when resolving a type symbol vs 
resolving and listing fields .
-     */
-    private StringBuilder appendPropertiesNode(StringBuilder builder, Schema 
schema, ObjectNode propertiesNode, Character seperator) {
-        checkNotNull(builder);
-        checkNotNull(propertiesNode);
-        Iterator<Map.Entry<String, JsonNode>> fields = propertiesNode.fields();
-        Joiner joiner = Joiner.on(","+LS).skipNulls();
-        List<String> fieldStrings = Lists.newArrayList();
-        for( ; fields.hasNext(); ) {
-            Map.Entry<String, JsonNode> field = fields.next();
-            String fieldId = field.getKey();
-            if( !config.getExclusions().contains(fieldId) && 
field.getValue().isObject()) {
-                ObjectNode fieldNode = (ObjectNode) field.getValue();
-                FieldType fieldType = FieldUtil.determineFieldType(fieldNode);
-                if (fieldType != null ) {
-                    switch (fieldType) {
-                        case ARRAY:
-                            ObjectNode itemsNode = (ObjectNode) 
fieldNode.get("items");
-                            if( currentDepth <= config.getMaxDepth()) {
-                                StringBuilder arrayItemsBuilder = 
appendArrayItems(new StringBuilder(), schema, fieldId, itemsNode, seperator);
-                                if( 
!Strings.isNullOrEmpty(arrayItemsBuilder.toString())) {
-                                    
fieldStrings.add(arrayItemsBuilder.toString());
-                                }
-                            }
-                            break;
-                        case OBJECT:
-                            Schema objectSchema = null;
-                            URI parentURI = null;
-                            if( fieldNode.has("$ref") || 
fieldNode.has("extends") ) {
-                                JsonNode refNode = fieldNode.get("$ref");
-                                JsonNode extendsNode = 
fieldNode.get("extends");
-                                if (refNode != null && refNode.isValueNode())
-                                    parentURI = URI.create(refNode.asText());
-                                else if (extendsNode != null && 
extendsNode.isObject())
-                                    parentURI = 
URI.create(extendsNode.get("$ref").asText());
-                                URI absoluteURI;
-                                if (parentURI.isAbsolute())
-                                    absoluteURI = parentURI;
-                                else {
-                                    absoluteURI = 
schema.getURI().resolve(parentURI);
-                                    if (!absoluteURI.isAbsolute() || 
(absoluteURI.isAbsolute() && !schemaStore.getByUri(absoluteURI).isPresent() ))
-                                        absoluteURI = 
schema.getParentURI().resolve(parentURI);
-                                }
-                                if (absoluteURI != null && 
absoluteURI.isAbsolute()) {
-                                    Optional<Schema> schemaLookup = 
schemaStore.getByUri(absoluteURI);
-                                    if (schemaLookup.isPresent()) {
-                                        objectSchema = schemaLookup.get();
-                                    }
-                                }
-                            }
-                            //ObjectNode childProperties = 
schemaStore.resolveProperties(schema, fieldNode, fieldId);
-                            if( currentDepth < config.getMaxDepth()) {
-                                StringBuilder structFieldBuilder = 
appendSchemaField(new StringBuilder(), objectSchema, fieldId, seperator);
-                                if( 
!Strings.isNullOrEmpty(structFieldBuilder.toString())) {
-                                    
fieldStrings.add(structFieldBuilder.toString());
-                                }
-                            }
-                            break;
-                        default:
-                            StringBuilder valueFieldBuilder = 
appendValueField(new StringBuilder(), schema, fieldId, fieldType, seperator);
-                            if( 
!Strings.isNullOrEmpty(valueFieldBuilder.toString())) {
-                                fieldStrings.add(valueFieldBuilder.toString());
-                            }
-                    }
+    if ( itemsNode.has("type")) {
+      try {
+        FieldType itemType = FieldUtil.determineFieldType(itemsNode);
+        switch ( itemType ) {
+          case OBJECT:
+            Schema objectSchema = null;
+            URI parentUri = null;
+            if ( itemsNode.has("$ref") || itemsNode.has("extends") ) {
+              JsonNode refNode = itemsNode.get("$ref");
+              JsonNode extendsNode = itemsNode.get("extends");
+              if (refNode != null && refNode.isValueNode()) {
+                parentUri = URI.create(refNode.asText());
+              } else if (extendsNode != null && extendsNode.isObject()) {
+                parentUri = URI.create(extendsNode.get("$ref").asText());
+              }
+              URI absoluteUri;
+              if (parentUri.isAbsolute()) {
+                absoluteUri = parentUri;
+              } else {
+                absoluteUri = schema.getUri().resolve(parentUri);
+                if (!absoluteUri.isAbsolute() || (absoluteUri.isAbsolute() && 
!schemaStore.getByUri(absoluteUri).isPresent() )) {
+                  absoluteUri = schema.getParentUri().resolve(parentUri);
+                }
+              }
+              if (absoluteUri != null && absoluteUri.isAbsolute()) {
+                Optional<Schema> schemaLookup = 
schemaStore.getByUri(absoluteUri);
+                if (schemaLookup.isPresent()) {
+                  objectSchema = schemaLookup.get();
                 }
+              }
             }
+            // have to resolve schema here
+
+            builder = appendArrayObject(builder, objectSchema, fieldId, 
seperator);
+            break;
+          case ARRAY:
+            ObjectNode subArrayItems = (ObjectNode) itemsNode.get("items");
+            builder = appendArrayItems(builder, schema, fieldId, 
subArrayItems, seperator);
+            break;
+          default:
+            builder = appendArrayField(builder, schema, fieldId, itemType, 
seperator);
         }
-        builder.append(joiner.join(fieldStrings)).append(LS);
-        Preconditions.checkNotNull(builder);
-        return builder;
+      } catch (Exception ex) {
+        LOGGER.warn("No item type resolvable for {}", fieldId);
+      }
     }
-
-    private static String cqlEscape( String fieldId ) {
-        return "`"+fieldId+"`";
+    checkNotNull(builder);
+    return builder;
+  }
+
+  private StringBuilder appendArrayField(StringBuilder builder, Schema schema, 
String fieldId, FieldType fieldType, Character seperator) {
+    // safe to append nothing
+    checkNotNull(builder);
+    checkNotNull(fieldId);
+    builder.append(cqlEscape(fieldId));
+    builder.append(seperator);
+    builder.append("list<" + cqlType(fieldType) + ">");
+    checkNotNull(builder);
+    return builder;
+  }
+
+  private StringBuilder appendArrayObject(StringBuilder builder, Schema 
schema, String fieldId, Character seperator) {
+    // safe to append nothing
+    checkNotNull(builder);
+    String schemaSymbol = schemaSymbol(schema);
+    if ( !Strings.isNullOrEmpty(fieldId) && schemaSymbol != null ) {
+      builder.append(cqlEscape(fieldId));
+      builder.append(seperator);
+      builder.append("list<" + schemaSymbol + ">");
+      builder.append(LS);
     }
-
-    private static String cqlType( FieldType fieldType ) {
-        switch( fieldType ) {
-            case STRING:
-                return "text";
-            case INTEGER:
-                return "int";
-            case NUMBER:
-                return "double";
-            case OBJECT:
-                return "tuple";
+    checkNotNull(builder);
+    return builder;
+  }
+
+  private StringBuilder appendSchemaField(StringBuilder builder, Schema 
schema, String fieldId, Character seperator) {
+    // safe to append nothing
+    checkNotNull(builder);
+    String schemaSymbol = schemaSymbol(schema);
+    if ( !Strings.isNullOrEmpty(fieldId) && schemaSymbol != null ) {
+      builder.append(cqlEscape(fieldId));
+      builder.append(seperator);
+      builder.append(schemaSymbol);
+    }
+    checkNotNull(builder);
+    return builder;
+  }
+
+  /*
+   can this be moved to streams-schemas if schemastore available in scope?
+   maybe an interface?
+   lot of boilerplate / reuse between plugins
+   however treatment is way different when resolving a type symbol vs 
resolving and listing fields .
+   */
+  private StringBuilder appendPropertiesNode(StringBuilder builder, Schema 
schema, ObjectNode propertiesNode, Character seperator) {
+    checkNotNull(builder);
+    checkNotNull(propertiesNode);
+    Iterator<Map.Entry<String, JsonNode>> fields = propertiesNode.fields();
+    Joiner joiner = Joiner.on("," + LS).skipNulls();
+    List<String> fieldStrings = Lists.newArrayList();
+    for ( ; fields.hasNext(); ) {
+      Map.Entry<String, JsonNode> field = fields.next();
+      String fieldId = field.getKey();
+      if ( !config.getExclusions().contains(fieldId) && 
field.getValue().isObject()) {
+        ObjectNode fieldNode = (ObjectNode) field.getValue();
+        FieldType fieldType = FieldUtil.determineFieldType(fieldNode);
+        if (fieldType != null ) {
+          switch (fieldType) {
             case ARRAY:
-                return "list";
+              ObjectNode itemsNode = (ObjectNode) fieldNode.get("items");
+              if ( currentDepth <= config.getMaxDepth()) {
+                StringBuilder arrayItemsBuilder = appendArrayItems(new 
StringBuilder(), schema, fieldId, itemsNode, seperator);
+                if ( !Strings.isNullOrEmpty(arrayItemsBuilder.toString())) {
+                  fieldStrings.add(arrayItemsBuilder.toString());
+                }
+              }
+              break;
+            case OBJECT:
+              Schema objectSchema = null;
+              URI parentUri = null;
+              if ( fieldNode.has("$ref") || fieldNode.has("extends") ) {
+                JsonNode refNode = fieldNode.get("$ref");
+                JsonNode extendsNode = fieldNode.get("extends");
+                if (refNode != null && refNode.isValueNode()) {
+                  parentUri = URI.create(refNode.asText());
+                } else if (extendsNode != null && extendsNode.isObject()) {
+                  parentUri = URI.create(extendsNode.get("$ref").asText());
+                }
+                URI absoluteUri;
+                if (parentUri.isAbsolute()) {
+                  absoluteUri = parentUri;
+                } else {
+                  absoluteUri = schema.getUri().resolve(parentUri);
+                  if (!absoluteUri.isAbsolute() || (absoluteUri.isAbsolute() 
&& !schemaStore.getByUri(absoluteUri).isPresent() )) {
+                    absoluteUri = schema.getParentUri().resolve(parentUri);
+                  }
+                }
+                if (absoluteUri != null && absoluteUri.isAbsolute()) {
+                  Optional<Schema> schemaLookup = 
schemaStore.getByUri(absoluteUri);
+                  if (schemaLookup.isPresent()) {
+                    objectSchema = schemaLookup.get();
+                  }
+                }
+              }
+              //ObjectNode childProperties = 
schemaStore.resolveProperties(schema, fieldNode, fieldId);
+              if ( currentDepth < config.getMaxDepth()) {
+                StringBuilder structFieldBuilder = appendSchemaField(new 
StringBuilder(), objectSchema, fieldId, seperator);
+                if ( !Strings.isNullOrEmpty(structFieldBuilder.toString())) {
+                  fieldStrings.add(structFieldBuilder.toString());
+                }
+              }
+              break;
             default:
-                return fieldType.name().toUpperCase();
+              StringBuilder valueFieldBuilder = appendValueField(new 
StringBuilder(), schema, fieldId, fieldType, seperator);
+              if ( !Strings.isNullOrEmpty(valueFieldBuilder.toString())) {
+                fieldStrings.add(valueFieldBuilder.toString());
+              }
+          }
         }
+      }
+    }
+    builder.append(joiner.join(fieldStrings)).append(LS);
+    Preconditions.checkNotNull(builder);
+    return builder;
+  }
+
+  private static String cqlEscape( String fieldId ) {
+    return "`" + fieldId + "`";
+  }
+
+  private static String cqlType( FieldType fieldType ) {
+    switch ( fieldType ) {
+      case STRING:
+        return "text";
+      case INTEGER:
+        return "int";
+      case NUMBER:
+        return "double";
+      case OBJECT:
+        return "tuple";
+      case ARRAY:
+        return "list";
+      default:
+        return fieldType.name().toUpperCase();
     }
+  }
 
-    private String schemaSymbol( Schema schema ) {
-        if (schema == null) return null;
-        // this needs to return whatever
-        if (schema.getURI().getScheme().equals("file")) {
-            String inputFile = schema.getURI().getPath();
-            String resourcePath = dropSourcePathPrefix(inputFile, 
config.getSourceDirectory());
-            for (String sourcePath : config.getSourcePaths()) {
-                resourcePath = dropSourcePathPrefix(resourcePath, sourcePath);
-            }
-            return dropExtension(resourcePath).replace("/", "_");
-        } else {
-            return "IDK";
-        }
+  private String schemaSymbol( Schema schema ) {
+    if (schema == null) {
+      return null;
+    }
+    // this needs to return whatever
+    if (schema.getUri().getScheme().equals("file")) {
+      String inputFile = schema.getUri().getPath();
+      String resourcePath = dropSourcePathPrefix(inputFile, 
config.getSourceDirectory());
+      for (String sourcePath : config.getSourcePaths()) {
+        resourcePath = dropSourcePathPrefix(resourcePath, sourcePath);
+      }
+      return dropExtension(resourcePath).replace("/", "_");
+    } else {
+      return "IDK";
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGeneratorMojo.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGeneratorMojo.java
 
b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGeneratorMojo.java
index 038e744..3625f28 100644
--- 
a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGeneratorMojo.java
+++ 
b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraResourceGeneratorMojo.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.plugins.cassandra;
 
 import org.apache.maven.plugin.AbstractMojo;
@@ -33,48 +34,63 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.List;
 
-@Mojo(  name = "generate-resources",
-        defaultPhase = LifecyclePhase.GENERATE_RESOURCES
-)
-@Execute(   goal = "generate-resources",
-            phase = LifecyclePhase.GENERATE_RESOURCES
-)
+@Mojo (
+    name = "generate-resources",
+    defaultPhase = LifecyclePhase.GENERATE_RESOURCES
+    )
+@Execute (
+    goal = "generate-resources",
+    phase = LifecyclePhase.GENERATE_RESOURCES
+    )
+/**
+ * Run within a module containing a src/main/jsonschema directory.
+ *
+ * <p/>
+ * mvn 
org.apache.streams.plugins:streams-plugin-cassandra:0.4-incubating:cassandra
+ *
+ */
 public class StreamsCassandraResourceGeneratorMojo extends AbstractMojo {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsCassandraResourceGeneratorMojo.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsCassandraResourceGeneratorMojo.class);
 
-    private volatile MojoFailureException mojoFailureException;
+  private volatile MojoFailureException mojoFailureException;
 
-    @Component
-    private MavenProject project;
+  @Component
+  private MavenProject project;
 
-    @Parameter( defaultValue = "${project.basedir}", readonly = true )
-    private File basedir;
+  @Parameter( defaultValue = "${project.basedir}", readonly = true )
+  private File basedir;
 
-    @Parameter( defaultValue = "src/main/jsonschema", readonly = true ) // 
Maven 3 only
-    public String sourceDirectory;
+  @Parameter( defaultValue = "src/main/jsonschema", readonly = true ) // Maven 
3 only
+  public String sourceDirectory;
 
-    @Parameter( readonly = true ) // Maven 3 only
-    public List<String> sourcePaths;
+  @Parameter( readonly = true ) // Maven 3 only
+  public List<String> sourcePaths;
 
-    @Parameter(defaultValue = "target/generated-resources/cassandra", readonly 
= true)
-    public String targetDirectory;
+  @Parameter(defaultValue = "target/generated-resources/cassandra", readonly = 
true)
+  public String targetDirectory;
 
-    public void execute() throws MojoExecutionException, MojoFailureException {
+  /**
+   * execute StreamsCassandraResourceGenerator mojo.
+   * @throws MojoExecutionException MojoExecutionException
+   * @throws MojoFailureException MojoFailureException
+   */
+  public void execute() throws MojoExecutionException, MojoFailureException {
 
-        //addProjectDependenciesToClasspath();
+    //addProjectDependenciesToClasspath();
 
-        StreamsCassandraGenerationConfig config = new 
StreamsCassandraGenerationConfig();
+    StreamsCassandraGenerationConfig config = new 
StreamsCassandraGenerationConfig();
 
-        if( sourcePaths != null && sourcePaths.size() > 0)
-            config.setSourcePaths(sourcePaths);
-        else
-            config.setSourceDirectory(sourceDirectory);
-        config.setTargetDirectory(targetDirectory);
+    if ( sourcePaths != null && sourcePaths.size() > 0) {
+      config.setSourcePaths(sourcePaths);
+    } else {
+      config.setSourceDirectory(sourceDirectory);
+    }
+    config.setTargetDirectory(targetDirectory);
 
-        StreamsCassandraResourceGenerator streamsCassandraResourceGenerator = 
new StreamsCassandraResourceGenerator(config);
+    StreamsCassandraResourceGenerator streamsCassandraResourceGenerator = new 
StreamsCassandraResourceGenerator(config);
 
-        streamsCassandraResourceGenerator.run();
-    }
+    streamsCassandraResourceGenerator.run();
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorCLITest.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorCLITest.java
 
b/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorCLITest.java
index cc288e5..63afc2d 100644
--- 
a/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorCLITest.java
+++ 
b/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorCLITest.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.plugins.cassandra.test;
 
+import org.apache.streams.plugins.cassandra.StreamsCassandraResourceGenerator;
+
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.plugins.cassandra.StreamsCassandraResourceGenerator;
 import org.junit.Test;
 
 import java.io.File;
@@ -33,38 +35,38 @@ import java.util.List;
 import static 
org.apache.streams.plugins.cassandra.test.StreamsCassandraResourceGeneratorTest.cqlFilter;
 
 /**
- * Created by sblackmon on 5/5/16.
+ * Test that StreamsCassandraResourceGeneratorCLI generates resources.
  */
 public class StreamsCassandraResourceGeneratorCLITest {
 
-    @Test
-    public void testStreamsCassandraResourceGeneratorCLI() throws Exception {
+  @Test
+  public void testStreamsCassandraResourceGeneratorCLI() throws Exception {
 
-        String sourceDirectory = "target/test-classes/activitystreams-schemas";
-        String targetDirectory = "target/generated-resources/test-cli";
+    String sourceDirectory = "target/test-classes/activitystreams-schemas";
+    String targetDirectory = "target/generated-resources/test-cli";
 
-        List<String> argsList = Lists.newArrayList(sourceDirectory, 
targetDirectory);
-        StreamsCassandraResourceGenerator.main(argsList.toArray(new 
String[0]));
+    List<String> argsList = Lists.newArrayList(sourceDirectory, 
targetDirectory);
+    StreamsCassandraResourceGenerator.main(argsList.toArray(new String[0]));
 
-        File testOutput = new File( targetDirectory );
+    File testOutput = new File( targetDirectory );
 
-        assert( testOutput != null );
-        assert( testOutput.exists() == true );
-        assert( testOutput.isDirectory() == true );
+    assert ( testOutput != null );
+    assert ( testOutput.exists() == true );
+    assert ( testOutput.isDirectory() == true );
 
-        Iterable<File> outputIterator = 
Files.fileTreeTraverser().breadthFirstTraversal(testOutput)
-                .filter(cqlFilter);
-        Collection<File> outputCollection = Lists.newArrayList(outputIterator);
-        assert( outputCollection.size() == 1 );
+    Iterable<File> outputIterator = 
Files.fileTreeTraverser().breadthFirstTraversal(testOutput)
+        .filter(cqlFilter);
+    Collection<File> outputCollection = Lists.newArrayList(outputIterator);
+    assert ( outputCollection.size() == 1 );
 
-        Path path = 
Paths.get(testOutput.getAbsolutePath()).resolve("types.cql");
+    Path path = Paths.get(testOutput.getAbsolutePath()).resolve("types.cql");
 
-        assert( path.toFile().exists() );
+    assert ( path.toFile().exists() );
 
-        String typesCqlBytes = new String(
-                java.nio.file.Files.readAllBytes(path));
+    String typesCqlBytes = new String(
+        java.nio.file.Files.readAllBytes(path));
 
-        assert( StringUtils.countMatches(typesCqlBytes, "CREATE TYPE") == 133 
);
+    assert ( StringUtils.countMatches(typesCqlBytes, "CREATE TYPE") == 133 );
 
-    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorMojoIT.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorMojoIT.java
 
b/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorMojoIT.java
index fc7765e..1eada8a 100644
--- 
a/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorMojoIT.java
+++ 
b/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorMojoIT.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.plugins.cassandra.test;
 
 import com.google.common.collect.Lists;
@@ -38,59 +39,58 @@ import java.util.List;
 import static 
org.apache.streams.plugins.cassandra.test.StreamsCassandraResourceGeneratorTest.cqlFilter;
 
 /**
- * Tests that streams-plugin-hive running via maven generates hql resources
+ * Tests that streams-plugin-cassandra running via maven generates cql 
resources.
  */
 public class StreamsCassandraResourceGeneratorMojoIT extends TestCase {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsCassandraResourceGeneratorMojoIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsCassandraResourceGeneratorMojoIT.class);
 
-    protected void setUp() throws Exception
-    {
-        // required for mojo lookups to work
-        super.setUp();
-    }
+  protected void setUp() throws Exception {
+    // required for mojo lookups to work
+    super.setUp();
+  }
 
-    @Test
-    public void testStreamsCassandraResourceGeneratorMojo() throws Exception {
+  @Test
+  public void testStreamsCassandraResourceGeneratorMojo() throws Exception {
 
-        File testDir = ResourceExtractor.simpleExtractResources( getClass(), 
"/streams-plugin-cassandra" );
+    File testDir = ResourceExtractor.simpleExtractResources( getClass(), 
"/streams-plugin-cassandra" );
 
-        Verifier verifier;
+    Verifier verifier;
 
-        verifier = new Verifier( testDir.getAbsolutePath() );
+    verifier = new Verifier( testDir.getAbsolutePath() );
 
-        List cliOptions = new ArrayList();
-        cliOptions.add( "-N" );
-        verifier.executeGoals( Lists.<String>newArrayList(
-                "clean",
-                "dependency:unpack-dependencies",
-                "generate-resources"));
+    List cliOptions = new ArrayList();
+    cliOptions.add( "-N" );
+    verifier.executeGoals( Lists.<String>newArrayList(
+        "clean",
+        "dependency:unpack-dependencies",
+        "generate-resources"));
 
-        verifier.verifyErrorFreeLog();
+    verifier.verifyErrorFreeLog();
 
-        verifier.resetStreams();
+    verifier.resetStreams();
 
-        Path testOutputPath = 
Paths.get(testDir.getAbsolutePath()).resolve("target/generated-resources/test-mojo");
+    Path testOutputPath = 
Paths.get(testDir.getAbsolutePath()).resolve("target/generated-resources/test-mojo");
 
-        File testOutput = testOutputPath.toFile();
+    File testOutput = testOutputPath.toFile();
 
-        assert( testOutput != null );
-        assert( testOutput.exists() == true );
-        assert( testOutput.isDirectory() == true );
+    assert ( testOutput != null );
+    assert ( testOutput.exists() == true );
+    assert ( testOutput.isDirectory() == true );
 
-        Iterable<File> outputIterator = 
Files.fileTreeTraverser().breadthFirstTraversal(testOutput)
-                .filter(cqlFilter);
-        Collection<File> outputCollection = Lists.newArrayList(outputIterator);
-        assert( outputCollection.size() == 1 );
+    Iterable<File> outputIterator = 
Files.fileTreeTraverser().breadthFirstTraversal(testOutput)
+        .filter(cqlFilter);
+    Collection<File> outputCollection = Lists.newArrayList(outputIterator);
+    assert ( outputCollection.size() == 1 );
 
-        Path path = testOutputPath.resolve("types.cql");
+    Path path = testOutputPath.resolve("types.cql");
 
-        assert( path.toFile().exists() );
+    assert ( path.toFile().exists() );
 
-        String typesCqlBytes = new String(
-                java.nio.file.Files.readAllBytes(path));
+    String typesCqlBytes = new String(
+        java.nio.file.Files.readAllBytes(path));
 
-        assert( StringUtils.countMatches(typesCqlBytes, "CREATE TYPE") == 133 
);
+    assert ( StringUtils.countMatches(typesCqlBytes, "CREATE TYPE") == 133 );
 
-    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorTest.java
 
b/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorTest.java
index 0ebdb2c..210831f 100644
--- 
a/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorTest.java
+++ 
b/streams-plugins/streams-plugin-cassandra/src/test/java/org/apache/streams/plugins/cassandra/test/StreamsCassandraResourceGeneratorTest.java
@@ -16,83 +16,87 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.plugins.cassandra.test;
 
+import org.apache.streams.plugins.cassandra.StreamsCassandraGenerationConfig;
+import org.apache.streams.plugins.cassandra.StreamsCassandraResourceGenerator;
+
 import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.plugins.cassandra.StreamsCassandraGenerationConfig;
-import org.apache.streams.plugins.cassandra.StreamsCassandraResourceGenerator;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collection;
+import javax.annotation.Nullable;
 
 /**
  * Test that cassandra resources are generated.
  */
 public class StreamsCassandraResourceGeneratorTest {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsCassandraResourceGeneratorTest.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsCassandraResourceGeneratorTest.class);
 
-    public static final Predicate<File> cqlFilter = new Predicate<File>() {
-        @Override
-        public boolean apply(@Nullable File file) {
-            if( file.getName().endsWith(".cql") )
-                return true;
-            else return false;
-        }
-    };
+  public static final Predicate<File> cqlFilter = new Predicate<File>() {
+    @Override
+    public boolean apply(@Nullable File file) {
+      if ( file.getName().endsWith(".cql") ) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  };
 
-    /**
-     * Test that cassandra resources are generated
-     *
-     * @throws Exception
-     */
-    @Test
-    public void StreamsCassandraResourceGenerator() throws Exception {
+  /**
+   * Test that cassandra resources are generated.
+   *
+   * @throws Exception Exception
+   */
+  @Test
+  public void testStreamsCassandraResourceGenerator() throws Exception {
 
-        StreamsCassandraGenerationConfig config = new 
StreamsCassandraGenerationConfig();
+    StreamsCassandraGenerationConfig config = new 
StreamsCassandraGenerationConfig();
 
-        String sourceDirectory = "target/test-classes/activitystreams-schemas";
+    String sourceDirectory = "target/test-classes/activitystreams-schemas";
 
-        config.setSourceDirectory(sourceDirectory);
+    config.setSourceDirectory(sourceDirectory);
 
-        config.setTargetDirectory("target/generated-resources/cassandra");
+    config.setTargetDirectory("target/generated-resources/cassandra");
 
-        config.setExclusions(Sets.newHashSet("attachments"));
+    config.setExclusions(Sets.newHashSet("attachments"));
 
-        config.setMaxDepth(2);
+    config.setMaxDepth(2);
 
-        StreamsCassandraResourceGenerator streamsCassandraResourceGenerator = 
new StreamsCassandraResourceGenerator(config);
-        streamsCassandraResourceGenerator.run();
+    StreamsCassandraResourceGenerator streamsCassandraResourceGenerator = new 
StreamsCassandraResourceGenerator(config);
+    streamsCassandraResourceGenerator.run();
 
-        File testOutput = config.getTargetDirectory();
+    File testOutput = config.getTargetDirectory();
 
-        assert( testOutput != null );
-        assert( testOutput.exists() == true );
-        assert( testOutput.isDirectory() == true );
+    assert ( testOutput != null );
+    assert ( testOutput.exists() == true );
+    assert ( testOutput.isDirectory() == true );
 
-        Iterable<File> outputIterator = 
Files.fileTreeTraverser().breadthFirstTraversal(testOutput)
-                .filter(cqlFilter);
-        Collection<File> outputCollection = Lists.newArrayList(outputIterator);
-        assert( outputCollection.size() == 1 );
+    Iterable<File> outputIterator = 
Files.fileTreeTraverser().breadthFirstTraversal(testOutput)
+        .filter(cqlFilter);
+    Collection<File> outputCollection = Lists.newArrayList(outputIterator);
+    assert ( outputCollection.size() == 1 );
 
-        Path path = 
Paths.get(testOutput.getAbsolutePath()).resolve("types.cql");
+    Path path = Paths.get(testOutput.getAbsolutePath()).resolve("types.cql");
 
-        assert( path.toFile().exists() );
+    assert ( path.toFile().exists() );
 
-        String typesCqlBytes = new String(
-                java.nio.file.Files.readAllBytes(path));
+    String typesCqlBytes = new String(
+        java.nio.file.Files.readAllBytes(path));
 
-        assert( StringUtils.countMatches(typesCqlBytes, "CREATE TYPE") == 133 
);
+    assert ( StringUtils.countMatches(typesCqlBytes, "CREATE TYPE") == 133 );
 
-    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchGenerationConfig.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchGenerationConfig.java
 
b/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchGenerationConfig.java
index 3e109a8..2a51a0c 100644
--- 
a/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchGenerationConfig.java
+++ 
b/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchGenerationConfig.java
@@ -16,9 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.plugins.elasticsearch;
 
 import org.apache.streams.util.schema.GenerationConfig;
+
 import org.jsonschema2pojo.DefaultGenerationConfig;
 import org.jsonschema2pojo.util.URLUtil;
 
@@ -32,68 +34,71 @@ import java.util.List;
 import java.util.Set;
 
 /**
- * Configures StreamsElasticsearchResourceGenerator
- *
- *
+ * Configures StreamsElasticsearchResourceGenerator.
  */
 public class StreamsElasticsearchGenerationConfig extends 
DefaultGenerationConfig implements GenerationConfig {
 
-    public String getSourceDirectory() {
-        return sourceDirectory;
-    }
+  public String getSourceDirectory() {
+    return sourceDirectory;
+  }
 
-    public List<String> getSourcePaths() {
-        return sourcePaths;
-    }
+  public List<String> getSourcePaths() {
+    return sourcePaths;
+  }
 
-    private String sourceDirectory;
-    private List<String> sourcePaths = new ArrayList<String>();
-    private String targetDirectory;
-    private int maxDepth = 1;
+  private String sourceDirectory;
+  private List<String> sourcePaths = new ArrayList<String>();
+  private String targetDirectory;
+  private int maxDepth = 1;
 
-    public Set<String> getExclusions() {
-        return exclusions;
-    }
+  public Set<String> getExclusions() {
+    return exclusions;
+  }
 
-    public void setExclusions(Set<String> exclusions) {
-        this.exclusions = exclusions;
-    }
+  public void setExclusions(Set<String> exclusions) {
+    this.exclusions = exclusions;
+  }
 
-    private Set<String> exclusions = new HashSet<String>();
+  private Set<String> exclusions = new HashSet<String>();
 
-    public int getMaxDepth() {
-        return maxDepth;
-    }
+  public int getMaxDepth() {
+    return maxDepth;
+  }
 
-    public void setSourceDirectory(String sourceDirectory) {
-        this.sourceDirectory = sourceDirectory;
-    }
+  public void setSourceDirectory(String sourceDirectory) {
+    this.sourceDirectory = sourceDirectory;
+  }
 
-    public void setSourcePaths(List<String> sourcePaths) {
-        this.sourcePaths = sourcePaths;
-    }
+  public void setSourcePaths(List<String> sourcePaths) {
+    this.sourcePaths = sourcePaths;
+  }
 
-    public void setTargetDirectory(String targetDirectory) {
-        this.targetDirectory = targetDirectory;
-    }
+  public void setTargetDirectory(String targetDirectory) {
+    this.targetDirectory = targetDirectory;
+  }
 
-    public File getTargetDirectory() {
-        return new File(targetDirectory);
-    }
+  public File getTargetDirectory() {
+    return new File(targetDirectory);
+  }
 
-    public Iterator<URL> getSource() {
-        if (null != sourceDirectory) {
-            return 
Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator();
-        }
-        List<URL> sourceURLs = new ArrayList<URL>();
-        if( sourcePaths != null && sourcePaths.size() > 0)
-            for (String source : sourcePaths) {
-                sourceURLs.add(URLUtil.parseURL(source));
-            }
-        return sourceURLs.iterator();
+  /**
+   * get all sources.
+   * @return Iterator of URL
+   */
+  public Iterator<URL> getSource() {
+    if (null != sourceDirectory) {
+      return 
Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator();
     }
-
-    public void setMaxDepth(int maxDepth) {
-        this.maxDepth = maxDepth;
+    List<URL> sourceUrls = new ArrayList<URL>();
+    if ( sourcePaths != null && sourcePaths.size() > 0) {
+      for (String source : sourcePaths) {
+        sourceUrls.add(URLUtil.parseURL(source));
+      }
     }
+    return sourceUrls.iterator();
+  }
+
+  public void setMaxDepth(int maxDepth) {
+    this.maxDepth = maxDepth;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchResourceGenerator.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchResourceGenerator.java
 
b/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchResourceGenerator.java
index 96e2ecd..47db819 100644
--- 
a/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchResourceGenerator.java
+++ 
b/streams-plugins/streams-plugin-elasticsearch/src/main/java/org/apache/streams/plugins/elasticsearch/StreamsElasticsearchResourceGenerator.java
@@ -16,15 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.plugins.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.schema.FieldType;
 import org.apache.streams.util.schema.FieldUtil;
@@ -32,6 +26,14 @@ import org.apache.streams.util.schema.GenerationConfig;
 import org.apache.streams.util.schema.Schema;
 import org.apache.streams.util.schema.SchemaStore;
 import org.apache.streams.util.schema.SchemaStoreImpl;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
 import org.jsonschema2pojo.util.URLUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,340 +55,370 @@ import static 
org.apache.streams.util.schema.FileUtil.writeFile;
 
 public class StreamsElasticsearchResourceGenerator implements Runnable {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsElasticsearchResourceGenerator.class);
-
-    private ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    private final static String LS = System.getProperty("line.separator");
-
-    private StreamsElasticsearchGenerationConfig config;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsElasticsearchResourceGenerator.class);
 
-    private SchemaStore schemaStore = new SchemaStoreImpl();
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
 
-    private int currentDepth = 0;
+  private static final String LS = System.getProperty("line.separator");
 
-    public static void main(String[] args) {
-        StreamsElasticsearchGenerationConfig config = new 
StreamsElasticsearchGenerationConfig();
+  private StreamsElasticsearchGenerationConfig config;
 
-        String sourceDirectory = "src/main/jsonschema";
-        String targetDirectory = 
"target/generated-resources/streams-plugin-elasticsearch";
+  private SchemaStore schemaStore = new SchemaStoreImpl();
 
-        if( args.length > 0 )
-            sourceDirectory = args[0];
-        if( args.length > 1 )
-            targetDirectory = args[1];
+  private int currentDepth = 0;
 
-        config.setSourceDirectory(sourceDirectory);
-        config.setTargetDirectory(targetDirectory);
+  /**
+   * Run from CLI without Maven
+   *
+   * <p/>
+   * java -jar streams-plugin-elasticsearch-jar-with-dependencies.jar 
StreamsElasticsearchResourceGenerator src/main/jsonschema 
target/generated-resources
+   *
+   * @param args [sourceDirectory, targetDirectory]
+   */
+  public static void main(String[] args) {
+    StreamsElasticsearchGenerationConfig config = new 
StreamsElasticsearchGenerationConfig();
 
-        StreamsElasticsearchResourceGenerator 
streamsElasticsearchResourceGenerator = new 
StreamsElasticsearchResourceGenerator(config);
-        streamsElasticsearchResourceGenerator.run();
+    String sourceDirectory = "src/main/jsonschema";
+    String targetDirectory = 
"target/generated-resources/streams-plugin-elasticsearch";
 
+    if ( args.length > 0 ) {
+      sourceDirectory = args[0];
     }
-
-    public 
StreamsElasticsearchResourceGenerator(StreamsElasticsearchGenerationConfig 
config) {
-        this.config = config;
+    if ( args.length > 1 ) {
+      targetDirectory = args[1];
     }
 
-    public void run() {
+    config.setSourceDirectory(sourceDirectory);
+    config.setTargetDirectory(targetDirectory);
 
-        Objects.requireNonNull(config);
+    StreamsElasticsearchResourceGenerator 
streamsElasticsearchResourceGenerator = new 
StreamsElasticsearchResourceGenerator(config);
+    streamsElasticsearchResourceGenerator.run();
 
-        generate(config);
+  }
 
-    }
+  public 
StreamsElasticsearchResourceGenerator(StreamsElasticsearchGenerationConfig 
config) {
+    this.config = config;
+  }
 
-    public void generate(StreamsElasticsearchGenerationConfig config) {
+  @Override
+  public void run() {
 
-        List<File> sourceFiles = new LinkedList<>();
+    Objects.requireNonNull(config);
 
-        for (Iterator<URL> sources = config.getSource(); sources.hasNext();) {
-            URL source = sources.next();
-            sourceFiles.add(URLUtil.getFileFromURL(source));
-        }
+    generate(config);
 
-        LOGGER.info("Seeded with {} source paths:", sourceFiles.size());
+  }
 
-        resolveRecursive((GenerationConfig)config, sourceFiles);
+  /**
+   * run generate using supplied StreamsElasticsearchGenerationConfig.
+   * @param config StreamsElasticsearchGenerationConfig
+   */
+  public void generate(StreamsElasticsearchGenerationConfig config) {
 
-        LOGGER.info("Resolved {} schema files:", sourceFiles.size());
+    List<File> sourceFiles = new LinkedList<>();
 
-        for (File item : sourceFiles) {
-            schemaStore.create(item.toURI());
-        }
-
-        LOGGER.info("Identified {} objects:", schemaStore.getSize());
+    for (Iterator<URL> sources = config.getSource(); sources.hasNext();) {
+      URL source = sources.next();
+      sourceFiles.add(URLUtil.getFileFromURL(source));
+    }
 
-        StringBuilder typesContent = new StringBuilder();
+    LOGGER.info("Seeded with {} source paths:", sourceFiles.size());
 
-        for (Iterator<Schema> schemaIterator = 
schemaStore.getSchemaIterator(); schemaIterator.hasNext(); ) {
-            Schema schema = schemaIterator.next();
-            currentDepth = 0;
-            if( schema.getURI().getScheme().equals("file")) {
-                String inputFile = schema.getURI().getPath();
-                String resourcePath = dropSourcePathPrefix(inputFile, 
config.getSourceDirectory());
-                for (String sourcePath : config.getSourcePaths()) {
-                    resourcePath = dropSourcePathPrefix(resourcePath, 
sourcePath);
-                }
-                String outputFile = config.getTargetDirectory() + "/" + 
resourcePath;
+    resolveRecursive((GenerationConfig)config, sourceFiles);
 
-                LOGGER.info("Processing {}:", resourcePath);
+    LOGGER.info("Resolved {} schema files:", sourceFiles.size());
 
-                String resourceId = schemaSymbol(schema);
+    for (File item : sourceFiles) {
+      schemaStore.create(item.toURI());
+    }
 
-                String resourceContent = generateResource(schema, resourceId);
+    LOGGER.info("Identified {} objects:", schemaStore.getSize());
 
-                if( !Strings.isNullOrEmpty(resourceContent))
-                    writeFile(outputFile, resourceContent);
+    StringBuilder typesContent = new StringBuilder();
 
-                LOGGER.info("Wrote {}:", outputFile);
-            }
+    for (Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator(); 
schemaIterator.hasNext(); ) {
+      Schema schema = schemaIterator.next();
+      currentDepth = 0;
+      if ( schema.getUri().getScheme().equals("file")) {
+        String inputFile = schema.getUri().getPath();
+        String resourcePath = dropSourcePathPrefix(inputFile, 
config.getSourceDirectory());
+        for (String sourcePath : config.getSourcePaths()) {
+          resourcePath = dropSourcePathPrefix(resourcePath, sourcePath);
         }
+        String outputFile = config.getTargetDirectory() + "/" + resourcePath;
 
-    }
-
-    public String generateResource(Schema schema, String resourceId) {
-        StringBuilder resourceBuilder = new StringBuilder();
+        LOGGER.info("Processing {}:", resourcePath);
 
-        ObjectNode rootNode = (ObjectNode) schema.getContent();
+        String resourceId = schemaSymbol(schema);
 
-        // remove java*
-        // remove description
-        // resolve all $ref
-        // replace format: date with type: date
-        // replace format: date-time with type: date
-        // replace array of primitive with just primitive
+        String resourceContent = generateResource(schema, resourceId);
 
-        try {
-            String objectString = MAPPER.writeValueAsString(rootNode);
-            resourceBuilder.append(objectString);
-        } catch (JsonProcessingException e) {
-            LOGGER.error("{}: {}", e.getClass().getName(), e);
+        if ( !Strings.isNullOrEmpty(resourceContent)) {
+          writeFile(outputFile, resourceContent);
         }
-        return resourceBuilder.toString();
-    }
 
-    public StringBuilder appendRootObject(StringBuilder builder, Schema 
schema, String resourceId, Character seperator) {
-        ObjectNode propertiesNode = schemaStore.resolveProperties(schema, 
null, resourceId);
-        if( propertiesNode.get("id") != null ) {
-            builder.append("id text PRIMARY KEY,");
-            builder.append(LS);
-            propertiesNode.remove("id");
-        }
-        if( propertiesNode.isObject() && propertiesNode.size() > 0) {
-            builder = appendPropertiesNode(builder, schema, propertiesNode, 
seperator);
-        }
-        return builder;
+        LOGGER.info("Wrote {}:", outputFile);
+      }
     }
 
-    private StringBuilder appendValueField(StringBuilder builder, Schema 
schema, String fieldId, FieldType fieldType, Character seperator) {
-        // safe to append nothing
-        Objects.requireNonNull(builder);
-        builder.append(cqlEscape(fieldId));
-        builder.append(seperator);
-        builder.append(cqlType(fieldType));
-        return builder;
+  }
+
+  /**
+   * generateResource String from schema and resourceId.
+   * @param schema Schema
+   * @param resourceId String
+   * @return mapping
+   */
+  public String generateResource(Schema schema, String resourceId) {
+    StringBuilder resourceBuilder = new StringBuilder();
+
+    ObjectNode rootNode = (ObjectNode) schema.getContent();
+
+    // remove java*
+    // remove description
+    // resolve all $ref
+    // replace format: date with type: date
+    // replace format: date-time with type: date
+    // replace array of primitive with just primitive
+
+    try {
+      String objectString = MAPPER.writeValueAsString(rootNode);
+      resourceBuilder.append(objectString);
+    } catch (JsonProcessingException ex) {
+      LOGGER.error("{}: {}", ex.getClass().getName(), ex);
     }
-
-    public StringBuilder appendArrayItems(StringBuilder builder, Schema 
schema, String fieldId, ObjectNode itemsNode, Character seperator) {
-        // not safe to append nothing
-        Objects.requireNonNull(builder);
-        if( itemsNode == null ) return builder;
-        if( itemsNode.has("type")) {
-            try {
-                FieldType itemType = FieldUtil.determineFieldType(itemsNode);
-                switch( itemType ) {
-                    case OBJECT:
-                        Schema objectSchema = null;
-                        URI parentURI = null;
-                        if( itemsNode.has("$ref") || itemsNode.has("extends") 
) {
-                            JsonNode refNode = itemsNode.get("$ref");
-                            JsonNode extendsNode = itemsNode.get("extends");
-                            if (refNode != null && refNode.isValueNode())
-                                parentURI = URI.create(refNode.asText());
-                            else if (extendsNode != null && 
extendsNode.isObject())
-                                parentURI = 
URI.create(extendsNode.get("$ref").asText());
-                            URI absoluteURI;
-                            if (parentURI.isAbsolute())
-                                absoluteURI = parentURI;
-                            else {
-                                absoluteURI = 
schema.getURI().resolve(parentURI);
-                                if (!absoluteURI.isAbsolute() || 
(absoluteURI.isAbsolute() && !schemaStore.getByUri(absoluteURI).isPresent() ))
-                                    absoluteURI = 
schema.getParentURI().resolve(parentURI);
-                            }
-                            if (absoluteURI.isAbsolute()) {
-                                Optional<Schema> schemaLookup = 
schemaStore.getByUri(absoluteURI);
-                                if (schemaLookup.isPresent()) {
-                                    objectSchema = schemaLookup.get();
-                                }
-                            }
-                        }
-                        // have to resolve schema here
-
-                        builder = appendArrayObject(builder, objectSchema, 
fieldId, seperator);
-                        break;
-                    case ARRAY:
-                        ObjectNode subArrayItems = (ObjectNode) 
itemsNode.get("items");
-                        builder = appendArrayItems(builder, schema, fieldId, 
subArrayItems, seperator);
-                        break;
-                    default:
-                        builder = appendArrayField(builder, schema, fieldId, 
itemType, seperator);
-                }
-            } catch (Exception e) {
-                LOGGER.warn("No item type resolvable for {}", fieldId);
-            }
-        }
-        Objects.requireNonNull(builder);
-        return builder;
+    return resourceBuilder.toString();
+  }
+
+  protected StringBuilder appendRootObject(StringBuilder builder, Schema 
schema, String resourceId, Character seperator) {
+    ObjectNode propertiesNode = schemaStore.resolveProperties(schema, null, 
resourceId);
+    if ( propertiesNode.get("id") != null ) {
+      builder.append("id text PRIMARY KEY,");
+      builder.append(LS);
+      propertiesNode.remove("id");
     }
-
-    private StringBuilder appendArrayField(StringBuilder builder, Schema 
schema, String fieldId, FieldType fieldType, Character seperator) {
-        // safe to append nothing
-        Objects.requireNonNull(builder);
-        Objects.requireNonNull(fieldId);
-        builder.append(cqlEscape(fieldId));
-        builder.append(seperator);
-        builder.append("list<").append(cqlType(fieldType)).append(">");
-        Objects.requireNonNull(builder);
-        return builder;
+    if ( propertiesNode.isObject() && propertiesNode.size() > 0) {
+      builder = appendPropertiesNode(builder, schema, propertiesNode, 
seperator);
     }
-
-    private StringBuilder appendArrayObject(StringBuilder builder, Schema 
schema, String fieldId, Character seperator) {
-        // safe to append nothing
-        Objects.requireNonNull(builder);
-        String schemaSymbol = schemaSymbol(schema);
-        if( !Strings.isNullOrEmpty(fieldId) && schemaSymbol != null ) {
-            builder.append(cqlEscape(fieldId));
-            builder.append(seperator);
-            builder.append("list<").append(schemaSymbol).append(">");
-            builder.append(LS);
-        }
-        Objects.requireNonNull(builder);
-        return builder;
+    return builder;
+  }
+
+  private StringBuilder appendValueField(StringBuilder builder, Schema schema, 
String fieldId, FieldType fieldType, Character seperator) {
+    // safe to append nothing
+    Objects.requireNonNull(builder);
+    builder.append(cqlEscape(fieldId));
+    builder.append(seperator);
+    builder.append(cqlType(fieldType));
+    return builder;
+  }
+
+  protected StringBuilder appendArrayItems(StringBuilder builder, Schema 
schema, String fieldId, ObjectNode itemsNode, Character seperator) {
+    // not safe to append nothing
+    Objects.requireNonNull(builder);
+    if ( itemsNode == null ) {
+      return builder;
     }
-
-    private StringBuilder appendSchemaField(StringBuilder builder, Schema 
schema, String fieldId, Character seperator) {
-        // safe to append nothing
-        Objects.requireNonNull(builder);
-        String schemaSymbol = schemaSymbol(schema);
-        if( !Strings.isNullOrEmpty(fieldId) && schemaSymbol != null ) {
-            builder.append(cqlEscape(fieldId));
-            builder.append(seperator);
-            builder.append(schemaSymbol);
-        }
-        Objects.requireNonNull(builder);
-        return builder;
-    }
-
-    /*
-     can this be moved to streams-schemas if schemastore available in scope?
-     maybe an interface?
-     lot of boilerplate / reuse between plugins
-     however treatment is way different when resolving a type symbol vs 
resolving and listing fields .
-     */
-    private StringBuilder appendPropertiesNode(StringBuilder builder, Schema 
schema, ObjectNode propertiesNode, Character seperator) {
-        Objects.requireNonNull(builder);
-        Objects.requireNonNull(propertiesNode);
-        Iterator<Map.Entry<String, JsonNode>> fields = propertiesNode.fields();
-        Joiner joiner = Joiner.on(","+LS).skipNulls();
-        List<String> fieldStrings = new ArrayList<>();
-        for( ; fields.hasNext(); ) {
-            Map.Entry<String, JsonNode> field = fields.next();
-            String fieldId = field.getKey();
-            if( !config.getExclusions().contains(fieldId) && 
field.getValue().isObject()) {
-                ObjectNode fieldNode = (ObjectNode) field.getValue();
-                FieldType fieldType = FieldUtil.determineFieldType(fieldNode);
-                if (fieldType != null ) {
-                    switch (fieldType) {
-                        case ARRAY:
-                            ObjectNode itemsNode = (ObjectNode) 
fieldNode.get("items");
-                            if( currentDepth <= config.getMaxDepth()) {
-                                StringBuilder arrayItemsBuilder = 
appendArrayItems(new StringBuilder(), schema, fieldId, itemsNode, seperator);
-                                if( 
!Strings.isNullOrEmpty(arrayItemsBuilder.toString())) {
-                                    
fieldStrings.add(arrayItemsBuilder.toString());
-                                }
-                            }
-                            break;
-                        case OBJECT:
-                            Schema objectSchema = null;
-                            URI parentURI = null;
-                            if( fieldNode.has("$ref") || 
fieldNode.has("extends") ) {
-                                JsonNode refNode = fieldNode.get("$ref");
-                                JsonNode extendsNode = 
fieldNode.get("extends");
-                                if (refNode != null && refNode.isValueNode())
-                                    parentURI = URI.create(refNode.asText());
-                                else if (extendsNode != null && 
extendsNode.isObject())
-                                    parentURI = 
URI.create(extendsNode.get("$ref").asText());
-                                URI absoluteURI;
-                                if (parentURI.isAbsolute())
-                                    absoluteURI = parentURI;
-                                else {
-                                    absoluteURI = 
schema.getURI().resolve(parentURI);
-                                    if (!absoluteURI.isAbsolute() || 
(absoluteURI.isAbsolute() && !schemaStore.getByUri(absoluteURI).isPresent() ))
-                                        absoluteURI = 
schema.getParentURI().resolve(parentURI);
-                                }
-                                if (absoluteURI.isAbsolute()) {
-                                    Optional<Schema> schemaLookup = 
schemaStore.getByUri(absoluteURI);
-                                    if (schemaLookup.isPresent()) {
-                                        objectSchema = schemaLookup.get();
-                                    }
-                                }
-                            }
-                            //ObjectNode childProperties = 
schemaStore.resolveProperties(schema, fieldNode, fieldId);
-                            if( currentDepth < config.getMaxDepth()) {
-                                StringBuilder structFieldBuilder = 
appendSchemaField(new StringBuilder(), objectSchema, fieldId, seperator);
-                                if( 
!Strings.isNullOrEmpty(structFieldBuilder.toString())) {
-                                    
fieldStrings.add(structFieldBuilder.toString());
-                                }
-                            }
-                            break;
-                        default:
-                            StringBuilder valueFieldBuilder = 
appendValueField(new StringBuilder(), schema, fieldId, fieldType, seperator);
-                            if( 
!Strings.isNullOrEmpty(valueFieldBuilder.toString())) {
-                                fieldStrings.add(valueFieldBuilder.toString());
-                            }
-                    }
+    if ( itemsNode.has("type")) {
+      try {
+        FieldType itemType = FieldUtil.determineFieldType(itemsNode);
+        switch ( itemType ) {
+          case OBJECT:
+            Schema objectSchema = null;
+            URI parentUri = null;
+            if ( itemsNode.has("$ref") || itemsNode.has("extends") ) {
+              JsonNode refNode = itemsNode.get("$ref");
+              JsonNode extendsNode = itemsNode.get("extends");
+              if (refNode != null && refNode.isValueNode()) {
+                parentUri = URI.create(refNode.asText());
+              } else if (extendsNode != null && extendsNode.isObject()) {
+                parentUri = URI.create(extendsNode.get("$ref").asText());
+              }
+              URI absoluteUri;
+              if (parentUri.isAbsolute()) {
+                absoluteUri = parentUri;
+              } else {
+                absoluteUri = schema.getUri().resolve(parentUri);
+                if (!absoluteUri.isAbsolute() || (absoluteUri.isAbsolute() && 
!schemaStore.getByUri(absoluteUri).isPresent() )) {
+                  absoluteUri = schema.getParentUri().resolve(parentUri);
+                }
+              }
+              if (absoluteUri.isAbsolute()) {
+                Optional<Schema> schemaLookup = 
schemaStore.getByUri(absoluteUri);
+                if (schemaLookup.isPresent()) {
+                  objectSchema = schemaLookup.get();
                 }
+              }
             }
+            // have to resolve schema here
+
+            builder = appendArrayObject(builder, objectSchema, fieldId, 
seperator);
+            break;
+          case ARRAY:
+            ObjectNode subArrayItems = (ObjectNode) itemsNode.get("items");
+            builder = appendArrayItems(builder, schema, fieldId, 
subArrayItems, seperator);
+            break;
+          default:
+            builder = appendArrayField(builder, schema, fieldId, itemType, 
seperator);
         }
-        builder.append(joiner.join(fieldStrings)).append(LS);
-        Objects.requireNonNull(builder);
-        return builder;
+      } catch (Exception ex) {
+        LOGGER.warn("No item type resolvable for {}", fieldId);
+      }
     }
-
-    private static String cqlEscape( String fieldId ) {
-        return "`"+fieldId+"`";
+    Objects.requireNonNull(builder);
+    return builder;
+  }
+
+  private StringBuilder appendArrayField(StringBuilder builder, Schema schema, 
String fieldId, FieldType fieldType, Character seperator) {
+    // safe to append nothing
+    Objects.requireNonNull(builder);
+    Objects.requireNonNull(fieldId);
+    builder.append(cqlEscape(fieldId));
+    builder.append(seperator);
+    builder.append("list<").append(cqlType(fieldType)).append(">");
+    Objects.requireNonNull(builder);
+    return builder;
+  }
+
+  private StringBuilder appendArrayObject(StringBuilder builder, Schema 
schema, String fieldId, Character seperator) {
+    // safe to append nothing
+    Objects.requireNonNull(builder);
+    String schemaSymbol = schemaSymbol(schema);
+    if ( !Strings.isNullOrEmpty(fieldId) && schemaSymbol != null ) {
+      builder.append(cqlEscape(fieldId));
+      builder.append(seperator);
+      builder.append("list<").append(schemaSymbol).append(">");
+      builder.append(LS);
     }
-
-    private static String cqlType( FieldType fieldType ) {
-        switch( fieldType ) {
-            case STRING:
-                return "text";
-            case INTEGER:
-                return "int";
-            case NUMBER:
-                return "double";
-            case OBJECT:
-                return "tuple";
+    Objects.requireNonNull(builder);
+    return builder;
+  }
+
+  private StringBuilder appendSchemaField(StringBuilder builder, Schema 
schema, String fieldId, Character seperator) {
+    // safe to append nothing
+    Objects.requireNonNull(builder);
+    String schemaSymbol = schemaSymbol(schema);
+    if ( !Strings.isNullOrEmpty(fieldId) && schemaSymbol != null ) {
+      builder.append(cqlEscape(fieldId));
+      builder.append(seperator);
+      builder.append(schemaSymbol);
+    }
+    Objects.requireNonNull(builder);
+    return builder;
+  }
+
+  /*
+   can this be moved to streams-schemas if schemastore available in scope?
+   maybe an interface?
+   lot of boilerplate / reuse between plugins
+   however treatment is way different when resolving a type symbol vs 
resolving and listing fields .
+   */
+  private StringBuilder appendPropertiesNode(StringBuilder builder, Schema 
schema, ObjectNode propertiesNode, Character seperator) {
+    Objects.requireNonNull(builder);
+    Objects.requireNonNull(propertiesNode);
+    Iterator<Map.Entry<String, JsonNode>> fields = propertiesNode.fields();
+    Joiner joiner = Joiner.on("," + LS).skipNulls();
+    List<String> fieldStrings = new ArrayList<>();
+    for ( ; fields.hasNext(); ) {
+      Map.Entry<String, JsonNode> field = fields.next();
+      String fieldId = field.getKey();
+      if ( !config.getExclusions().contains(fieldId) && 
field.getValue().isObject()) {
+        ObjectNode fieldNode = (ObjectNode) field.getValue();
+        FieldType fieldType = FieldUtil.determineFieldType(fieldNode);
+        if (fieldType != null ) {
+          switch (fieldType) {
             case ARRAY:
-                return "list";
+              ObjectNode itemsNode = (ObjectNode) fieldNode.get("items");
+              if ( currentDepth <= config.getMaxDepth()) {
+                StringBuilder arrayItemsBuilder = appendArrayItems(new 
StringBuilder(), schema, fieldId, itemsNode, seperator);
+                if ( !Strings.isNullOrEmpty(arrayItemsBuilder.toString())) {
+                  fieldStrings.add(arrayItemsBuilder.toString());
+                }
+              }
+              break;
+            case OBJECT:
+              Schema objectSchema = null;
+              URI parentUri = null;
+              if ( fieldNode.has("$ref") || fieldNode.has("extends") ) {
+                JsonNode refNode = fieldNode.get("$ref");
+                JsonNode extendsNode = fieldNode.get("extends");
+                if (refNode != null && refNode.isValueNode()) {
+                  parentUri = URI.create(refNode.asText());
+                } else if (extendsNode != null && extendsNode.isObject()) {
+                  parentUri = URI.create(extendsNode.get("$ref").asText());
+                }
+                URI absoluteUri;
+                if (parentUri.isAbsolute()) {
+                  absoluteUri = parentUri;
+                } else {
+                  absoluteUri = schema.getUri().resolve(parentUri);
+                  if (!absoluteUri.isAbsolute() || (absoluteUri.isAbsolute() 
&& !schemaStore.getByUri(absoluteUri).isPresent() )) {
+                    absoluteUri = schema.getParentUri().resolve(parentUri);
+                  }
+                }
+                if (absoluteUri.isAbsolute()) {
+                  Optional<Schema> schemaLookup = 
schemaStore.getByUri(absoluteUri);
+                  if (schemaLookup.isPresent()) {
+                    objectSchema = schemaLookup.get();
+                  }
+                }
+              }
+              //ObjectNode childProperties = 
schemaStore.resolveProperties(schema, fieldNode, fieldId);
+              if ( currentDepth < config.getMaxDepth()) {
+                StringBuilder structFieldBuilder = appendSchemaField(new 
StringBuilder(), objectSchema, fieldId, seperator);
+                if ( !Strings.isNullOrEmpty(structFieldBuilder.toString())) {
+                  fieldStrings.add(structFieldBuilder.toString());
+                }
+              }
+              break;
             default:
-                return fieldType.name().toUpperCase();
+              StringBuilder valueFieldBuilder = appendValueField(new 
StringBuilder(), schema, fieldId, fieldType, seperator);
+              if ( !Strings.isNullOrEmpty(valueFieldBuilder.toString())) {
+                fieldStrings.add(valueFieldBuilder.toString());
+              }
+          }
         }
+      }
+    }
+    builder.append(joiner.join(fieldStrings)).append(LS);
+    Objects.requireNonNull(builder);
+    return builder;
+  }
+
+  private static String cqlEscape( String fieldId ) {
+    return "`" + fieldId + "`";
+  }
+
+  private static String cqlType( FieldType fieldType ) {
+    switch ( fieldType ) {
+      case STRING:
+        return "text";
+      case INTEGER:
+        return "int";
+      case NUMBER:
+        return "double";
+      case OBJECT:
+        return "tuple";
+      case ARRAY:
+        return "list";
+      default:
+        return fieldType.name().toUpperCase();
     }
+  }
 
-    private String schemaSymbol( Schema schema ) {
-        if (schema == null) return null;
-        // this needs to return whatever
-        if (schema.getURI().getScheme().equals("file")) {
-            String inputFile = schema.getURI().getPath();
-            String resourcePath = dropSourcePathPrefix(inputFile, 
config.getSourceDirectory());
-            for (String sourcePath : config.getSourcePaths()) {
-                resourcePath = dropSourcePathPrefix(resourcePath, sourcePath);
-            }
-            return dropExtension(resourcePath).replace("/", "_");
-        } else {
-            return "IDK";
-        }
+  private String schemaSymbol( Schema schema ) {
+    if (schema == null) {
+      return null;
+    }
+    // this needs to return whatever
+    if (schema.getUri().getScheme().equals("file")) {
+      String inputFile = schema.getUri().getPath();
+      String resourcePath = dropSourcePathPrefix(inputFile, 
config.getSourceDirectory());
+      for (String sourcePath : config.getSourcePaths()) {
+        resourcePath = dropSourcePathPrefix(resourcePath, sourcePath);
+      }
+      return dropExtension(resourcePath).replace("/", "_");
+    } else {
+      return "IDK";
     }
+  }
 }

Reply via email to