PR feedback

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5a65dc51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5a65dc51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5a65dc51

Branch: refs/heads/master
Commit: 5a65dc515286366beefaa5dfb85e6438d64b9a5f
Parents: d082e01
Author: sblackmon <sblack...@w2odigital.com>
Authored: Thu Dec 11 13:55:52 2014 -0600
Committer: sblackmon <sblack...@w2odigital.com>
Committed: Thu Dec 11 13:55:52 2014 -0600

----------------------------------------------------------------------
 .../converter/ActivityConverterProcessor.java   | 90 +++++++++++---------
 .../test/CustomDocumentClassifier.java          |  3 +-
 2 files changed, 50 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a65dc51/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
index 68c8fd9..81d83ca 100644
--- 
a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
+++ 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
@@ -32,6 +32,7 @@ import org.apache.streams.data.util.ActivityUtil;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
 import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
 import org.reflections.util.ClasspathHelper;
 import org.reflections.util.ConfigurationBuilder;
 import org.slf4j.Logger;
@@ -72,8 +73,7 @@ public class ActivityConverterProcessor implements 
StreamsProcessor {
     }
 
     public ActivityConverterProcessor(ActivityConverterProcessorConfiguration 
configuration) {
-        this.classifiers = Lists.newArrayList();
-        this.converters = Lists.newArrayList();
+        this();
         this.configuration = configuration;
     }
 
@@ -112,57 +112,43 @@ public class ActivityConverterProcessor implements 
StreamsProcessor {
 
                 Object typedDoc = typedDocs.get(converter.requiredClass());
 
-                // if the document can be typed as the required class
-                if( typedDoc != null ) {
+                List<Activity> activities = applyConverter(converter, 
typedDoc);
 
-                    StreamsDatum datum = DatumUtils.cloneDatum(entry);
-
-                    // let the converter create activities if it can
-                    List<Activity> activities;
-                    try {
-                        activities = convertToActivity(converter, typedDoc);
+            }
 
-                        if( activities != null && activities.size() > 0) {
+            for (Activity activity : activities) {
+                StreamsDatum datum = DatumUtils.cloneDatum(entry);
+                datum.setId(activity.getId());
+                result.add(datum);
+            }
 
-                            for (Activity activity : activities) {
+        } catch( Exception e ) {
+            LOGGER.warn("General exception in process! " + e.getMessage());
+        } finally {
+            return result;
+        }
 
-                                if (activity != null) {
+    }
 
-                                    // only accept valid activities
-                                    //   this primitive validity check should 
be replaced with
-                                    //   one that applies javax.validation to 
JSR303 annotations
-                                    //   on the Activity json schema once a 
suitable implementation
-                                    //   is found.
-                                    if (ActivityUtil.isValid(activity)) {
-                                        datum.setDocument(activity);
-                                        datum.setId(activity.getId());
-                                        result.add(datum);
-                                    } else {
-                                        
LOGGER.debug(converter.getClass().getCanonicalName() + " produced invalid 
Activity converting " + 
converter.requiredClass().getClass().getCanonicalName());
-                                    }
+    protected List<Activity> applyConverter(ActivityConverter converter, 
Object typedDoc) {
 
-                                } else {
-                                    
LOGGER.debug(converter.getClass().getCanonicalName() + " returned null 
converting " + converter.requiredClass().getClass().getCanonicalName() + " to 
Activity");
-                                }
+        // if the document can be typed as the required class
+        if( typedDoc != null ) {
 
-                            }
-                        }
-                    } catch( Exception e ) {
-                        LOGGER.debug("convertToActivity caught exception " + 
e.getMessage());
-                    }
+            // let the converter create activities if it can
+            List<Activity> activities;
+            try {
+                activities = convertToActivity(converter, typedDoc);
 
+                if( activities != null && activities.size() > 0) {
 
 
                 }
-
+            } catch( Exception e ) {
+                LOGGER.debug("convertToActivity caught exception " + 
e.getMessage());
             }
 
-        } catch( Exception e ) {
-            LOGGER.warn("General exception in process! " + e.getMessage());
-        } finally {
-            return result;
         }
-
     }
 
     protected List<Activity> convertToActivity(ActivityConverter converter, 
Object document) {
@@ -173,6 +159,26 @@ public class ActivityConverterProcessor implements 
StreamsProcessor {
         } catch (ActivityConversionException e1) {
             LOGGER.debug(converter.getClass().getCanonicalName() + " unable to 
convert " + converter.requiredClass().getClass().getCanonicalName() + " to 
Activity");
         }
+
+        for (Activity activity : activities) {
+
+            if (activity != null) {
+
+                // only accept valid activities
+                //   this primitive validity check should be replaced with
+                //   one that applies javax.validation to JSR303 annotations
+                //   on the Activity json schema once a suitable implementation
+                //   is found.
+                if (!ActivityUtil.isValid(activity)) {
+                    activities.remove(activity);
+                    LOGGER.debug(converter.getClass().getCanonicalName() + " 
produced invalid Activity converting " + 
converter.requiredClass().getClass().getCanonicalName());
+                }
+
+            } else {
+                LOGGER.debug(converter.getClass().getCanonicalName() + " 
returned null converting " + 
converter.requiredClass().getClass().getCanonicalName() + " to Activity");
+            }
+
+        }
         return activities;
 
     }
@@ -209,9 +215,9 @@ public class ActivityConverterProcessor implements 
StreamsProcessor {
 
     @Override
     public void prepare(Object configurationObject) {
-//        Preconditions.checkArgument(configurationObject instanceof 
ActivityConverterProcessorConfiguration);
-//        ActivityConverterProcessorConfiguration configuration = 
(ActivityConverterProcessorConfiguration) configurationObject;
-        Reflections reflections = new Reflections(new 
ConfigurationBuilder().setUrls(ClasspathHelper.forManifest()));
+        Reflections reflections = new Reflections(new ConfigurationBuilder()
+                .setUrls(ClasspathHelper.forPackage("org.apache.streams.data"))
+                .setScanners(new SubTypesScanner()));
         if (configuration.getClassifiers().size() > 0) {
             for( DocumentClassifier classifier : 
configuration.getClassifiers()) {
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a65dc51/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/CustomDocumentClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/CustomDocumentClassifier.java
 
b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/CustomDocumentClassifier.java
index 102c8ba..685ce8c 100644
--- 
a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/CustomDocumentClassifier.java
+++ 
b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/CustomDocumentClassifier.java
@@ -63,8 +63,9 @@ public class CustomDocumentClassifier implements 
DocumentClassifier {
         } else if( document instanceof ObjectNode ){
             classes.add(ObjectNode.class);
             possibleMatchDocument = 
this.mapper.convertValue((ObjectNode)document, CustomType.class);
-            if(possibleMatchDocument != null && 
possibleMatchDocument.getTest() != null)
+            if(possibleMatchDocument != null && 
possibleMatchDocument.getTest() != null) {
                 classes.add(CustomType.class);
+            }
         } else {
             classes.add(document.getClass());
         }

Reply via email to