http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
index 677b22f..3620346 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
@@ -19,6 +19,12 @@
 
 package com.google.gplus.provider;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import 
org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
+
 import com.carrotsearch.randomizedtesting.RandomizedTest;
 import com.carrotsearch.randomizedtesting.annotations.Repeat;
 import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -29,11 +35,7 @@ import com.google.api.services.plus.model.Activity;
 import com.google.api.services.plus.model.ActivityFeed;
 import com.google.common.collect.Lists;
 import com.google.gplus.serializer.util.GPlusActivityDeserializer;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import 
org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
+
 import org.joda.time.DateTime;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -55,231 +57,256 @@ import static org.mockito.Mockito.when;
  */
 public class TestGPlusUserActivityCollector extends RandomizedTest {
 
+  private static final String ACTIVITY_TEMPLATE = "{ \"kind\": 
\"plus#activity\", \"etag\": 
\"\\\"Vea_b94Y77GDGgRK7gFNPnolKQw/v1-6aVSBGT4qiStMoz7f2_AN2fM\\\"\", \"title\": 
\"\", \"published\": \"%s\", \"updated\": \"2014-10-27T06:26:33.927Z\", \"id\": 
\"z13twrlznpvtzz52w22mdt1y0k3of1djw04\", \"url\": 
\"https://plus.google.com/116771159471120611293/posts/GR7CGR8N5VL\";, \"actor\": 
{ \"id\": \"116771159471120611293\", \"displayName\": \"displayName\", \"url\": 
\"https://plus.google.com/116771159471120611293\";, \"image\": { \"url\": 
\"https://lh6.googleusercontent.com/-C0fiZBxdvw0/AAAAAAAAAAI/AAAAAAAAJ5k/K4pgR3_-_ms/photo.jpg?sz=50\";
 } }, \"verb\": \"share\", \"object\": { \"objectType\": \"activity\", \"id\": 
\"z13zgvtiurjgfti1v234iflghvq2c1dge04\", \"actor\": { \"id\": 
\"104954254300557350002\", \"displayName\": \"displayName\", \"url\": 
\"https://plus.google.com/104954254300557350002\";, \"image\": { \"url\": 
\"https://lh4.googleusercontent.com/-SO1scj4p2LA/AAAAAAAAAAI/AAAAAAAAI-s/efA
 9LBVe144/photo.jpg?sz=50\" } }, \"content\": \"\", \"url\": 
\"https://plus.google.com/104954254300557350002/posts/AwewXhtn7ws\";, 
\"replies\": { \"totalItems\": 0, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/comments\";
 }, \"plusoners\": { \"totalItems\": 9, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/plusoners\";
 }, \"resharers\": { \"totalItems\": 0, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/resharers\";
 }, \"attachments\": [ { \"objectType\": \"photo\", \"id\": 
\"104954254300557350002.6074732746360957410\", \"content\": \"26/10/2014 - 1\", 
\"url\": 
\"https://plus.google.com/photos/104954254300557350002/albums/6074732747132702225/6074732746360957410\";,
 \"image\": { \"url\": 
\"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w506-h750/2014%2B-%2B1\";,
 \"type\": \"ima
 ge/jpeg\" }, \"fullImage\": { \"url\": 
\"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w600-h1141/2014%2B-%2B1\";,
 \"type\": \"image/jpeg\", \"height\": 1141, \"width\": 600 } } ] }, 
\"annotation\": \"Truth 😜\", \"provider\": { \"title\": \"Reshared Post\" }, 
\"access\": { \"kind\": \"plus#acl\", \"description\": \"Public\", \"items\": [ 
{ \"type\": \"public\" } ] } }";
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+  private static final String IN_RANGE_IDENTIFIER = "data in range";
 
-    private static final String ACTIVITY_TEMPLATE = "{ \"kind\": 
\"plus#activity\", \"etag\": 
\"\\\"Vea_b94Y77GDGgRK7gFNPnolKQw/v1-6aVSBGT4qiStMoz7f2_AN2fM\\\"\", \"title\": 
\"\", \"published\": \"%s\", \"updated\": \"2014-10-27T06:26:33.927Z\", \"id\": 
\"z13twrlznpvtzz52w22mdt1y0k3of1djw04\", \"url\": 
\"https://plus.google.com/116771159471120611293/posts/GR7CGR8N5VL\";, \"actor\": 
{ \"id\": \"116771159471120611293\", \"displayName\": \"displayName\", \"url\": 
\"https://plus.google.com/116771159471120611293\";, \"image\": { \"url\": 
\"https://lh6.googleusercontent.com/-C0fiZBxdvw0/AAAAAAAAAAI/AAAAAAAAJ5k/K4pgR3_-_ms/photo.jpg?sz=50\";
 } }, \"verb\": \"share\", \"object\": { \"objectType\": \"activity\", \"id\": 
\"z13zgvtiurjgfti1v234iflghvq2c1dge04\", \"actor\": { \"id\": 
\"104954254300557350002\", \"displayName\": \"displayName\", \"url\": 
\"https://plus.google.com/104954254300557350002\";, \"image\": { \"url\": 
\"https://lh4.googleusercontent.com/-SO1scj4p2LA/AAAAAAAAAAI/AAAAAAAAI-s/e
 fA9LBVe144/photo.jpg?sz=50\" } }, \"content\": \"\", \"url\": 
\"https://plus.google.com/104954254300557350002/posts/AwewXhtn7ws\";, 
\"replies\": { \"totalItems\": 0, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/comments\";
 }, \"plusoners\": { \"totalItems\": 9, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/plusoners\";
 }, \"resharers\": { \"totalItems\": 0, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/resharers\";
 }, \"attachments\": [ { \"objectType\": \"photo\", \"id\": 
\"104954254300557350002.6074732746360957410\", \"content\": \"26/10/2014 - 1\", 
\"url\": 
\"https://plus.google.com/photos/104954254300557350002/albums/6074732747132702225/6074732746360957410\";,
 \"image\": { \"url\": 
\"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w506-h750/2014%2B-%2B1\";,
 \"type\": \"i
 mage/jpeg\" }, \"fullImage\": { \"url\": 
\"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w600-h1141/2014%2B-%2B1\";,
 \"type\": \"image/jpeg\", \"height\": 1141, \"width\": 600 } } ] }, 
\"annotation\": \"Truth 😜\", \"provider\": { \"title\": \"Reshared Post\" }, 
\"access\": { \"kind\": \"plus#acl\", \"description\": \"Public\", \"items\": [ 
{ \"type\": \"public\" } ] } }";
-    private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-    private static final String IN_RANGE_IDENTIFIER = "data in range";
-
+  static {
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Activity.class, new 
GPlusActivityDeserializer());
+    MAPPER.registerModule(simpleModule);
+    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
 
-    static {
-        SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(Activity.class, new 
GPlusActivityDeserializer());
-        MAPPER.registerModule(simpleModule);
-        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+  /**
+   * Creates a randomized activity and randomized date range.
+   *
+   * <p/>
+   * The activity feed is separated into three chunks,
+   * |. . . data too recent to be in date range . . .||. . . data in date 
range. . .||. . . data too old to be in date range|
+   * [index 0, 
.............................................................................................,
 index length-1]
+   *
+   * <p/>
+   * Inside of those chunks data has no order, but the list is ordered by 
those three chunks.
+   *
+   * <p/>
+   * The test will check to see if the num of data in the date range make onto 
the output queue.
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testWithBeforeAndAfterDates() throws InterruptedException {
+    //initialize counts assuming no date ranges will be used
+    int numActivities = randomIntBetween(0, 1000);
+    int numActivitiesInDateRange = numActivities;
+    int numberOutOfRange = 0;
+    int numBeforeRange = 0;
+    int numAfterRange = 0;
+    //determine if date ranges will be used
+    DateTime beforeDate = null;
+    DateTime afterDate = null;
+    if (randomInt() % 2 == 0) {
+      beforeDate = DateTime.now().minusDays(randomIntBetween(1,5));
     }
-
-    /**
-     * Creates a randomized activity and randomized date range.
-     * The activity feed is separated into three chunks,
-     * |. . . data too recent to be in date range . . .||. . . data in date 
range. . .||. . . data too old to be in date range|
-     * [index 0, 
.............................................................................................,
 index length-1]
-     * Inside of those chunks data has no order, but the list is ordered by 
those three chunks.
-     *
-     * The test will check to see if the num of data in the date range make 
onto the output queue.
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testWithBeforeAndAfterDates() throws InterruptedException {
-        //initialize counts assuming no date ranges will be used
-        int numActivities = randomIntBetween(0, 1000);
-        int numActivitiesInDateRange = numActivities;
-        int numberOutOfRange = 0;
-        int numBerforeRange = 0;
-        int numAfterRange = 0;
-        //determine if date ranges will be used
-        DateTime beforeDate = null;
-        DateTime afterDate = null;
-        if(randomInt() % 2 == 0) {
-            beforeDate = DateTime.now().minusDays(randomIntBetween(1,5));
-        }
-        if(randomInt() % 2 == 0) {
-            if(beforeDate == null) {
-                afterDate = DateTime.now().minusDays(randomIntBetween(1, 10));
-            } else {
-                afterDate = beforeDate.minusDays(randomIntBetween(1, 10));
-            }
-        }
-        //update counts if date ranges are going to be used.
-        if(beforeDate != null || afterDate != null ) { //assign amount to be 
in range
-            numActivitiesInDateRange = randomIntBetween(0, numActivities);
-            numberOutOfRange = numActivities - numActivitiesInDateRange;
-        }
-        if(beforeDate == null && afterDate != null) { //assign all out of 
range to be before the start of the range
-            numBerforeRange = numberOutOfRange;
-        } else if(beforeDate != null && afterDate == null) { //assign all out 
of range to be after the start of the range
-            numAfterRange = numberOutOfRange;
-        } else if(beforeDate != null && afterDate != null) { //assign half 
before range and half after the range
-            numAfterRange = (numberOutOfRange / 2) + (numberOutOfRange % 2);
-            numBerforeRange = numberOutOfRange / 2;
-        }
-
-        Plus plus = createMockPlus(numBerforeRange, numAfterRange, 
numActivitiesInDateRange, afterDate, beforeDate);
-        BackOffStrategy strategy = new ConstantTimeBackOffStrategy(1);
-        BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
-        UserInfo userInfo = new UserInfo();
-        userInfo.setUserId("A");
-        userInfo.setAfterDate(afterDate);
-        userInfo.setBeforeDate(beforeDate);
-        GPlusUserActivityCollector collector = new 
GPlusUserActivityCollector(plus, datums, strategy, userInfo);
-        collector.run();
-
-        assertEquals(numActivitiesInDateRange, datums.size());
-        while(!datums.isEmpty()) {
-            StreamsDatum datum = datums.take();
-            assertNotNull(datum);
-            assertNotNull(datum.getDocument());
-            assertTrue(datum.getDocument() instanceof String);
-            
assertTrue(((String)datum.getDocument()).contains(IN_RANGE_IDENTIFIER)); //only 
in range documents are on the out going queue.
-        }
+    if (randomInt() % 2 == 0) {
+      if (beforeDate == null) {
+        afterDate = DateTime.now().minusDays(randomIntBetween(1, 10));
+      } else {
+        afterDate = beforeDate.minusDays(randomIntBetween(1, 10));
+      }
+    }
+    //update counts if date ranges are going to be used.
+    if (beforeDate != null || afterDate != null ) { //assign amount to be in 
range
+      numActivitiesInDateRange = randomIntBetween(0, numActivities);
+      numberOutOfRange = numActivities - numActivitiesInDateRange;
+    }
+    if (beforeDate == null && afterDate != null) { //assign all out of range 
to be before the start of the range
+      numBeforeRange = numberOutOfRange;
+    } else if (beforeDate != null && afterDate == null) { //assign all out of 
range to be after the start of the range
+      numAfterRange = numberOutOfRange;
+    } else if (beforeDate != null && afterDate != null) { //assign half before 
range and half after the range
+      numAfterRange = (numberOutOfRange / 2) + (numberOutOfRange % 2);
+      numBeforeRange = numberOutOfRange / 2;
     }
 
+    Plus plus = createMockPlus(numBeforeRange, numAfterRange, 
numActivitiesInDateRange, afterDate, beforeDate);
+    BackOffStrategy strategy = new ConstantTimeBackOffStrategy(1);
+    BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
+    UserInfo userInfo = new UserInfo();
+    userInfo.setUserId("A");
+    userInfo.setAfterDate(afterDate);
+    userInfo.setBeforeDate(beforeDate);
+    GPlusUserActivityCollector collector = new 
GPlusUserActivityCollector(plus, datums, strategy, userInfo);
+    collector.run();
 
-    private Plus createMockPlus(final int numBefore, final int numAfter, final 
int numInRange, final DateTime after, final DateTime before) {
-        Plus plus = mock(Plus.class);
-        final Plus.Activities activities = createMockPlusActivities(numBefore, 
numAfter, numInRange, after, before);
-        doAnswer(new Answer() {
-            @Override
-            public Plus.Activities answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                return activities;
-            }
-        }).when(plus).activities();
-        return plus;
+    assertEquals(numActivitiesInDateRange, datums.size());
+    while (!datums.isEmpty()) {
+      StreamsDatum datum = datums.take();
+      assertNotNull(datum);
+      assertNotNull(datum.getDocument());
+      assertTrue(datum.getDocument() instanceof String);
+      assertTrue(((String)datum.getDocument()).contains(IN_RANGE_IDENTIFIER)); 
//only in range documents are on the out going queue.
     }
+  }
 
-    private Plus.Activities createMockPlusActivities(final int numBefore, 
final int numAfter, final int numInRange, final DateTime after, final DateTime 
before) {
-        Plus.Activities activities = mock(Plus.Activities.class);
-        try {
-            Plus.Activities.List list = 
createMockPlusActivitiesList(numBefore, numAfter, numInRange, after, before);
-            when(activities.list(anyString(), anyString())).thenReturn(list);
-        } catch (IOException ioe) {
-            fail("Should not have thrown exception while creating mock. : 
"+ioe.getMessage());
-        }
+
+  private Plus createMockPlus(final int numBefore, final int numAfter, final 
int numInRange, final DateTime after, final DateTime before) {
+    Plus plus = mock(Plus.class);
+    final Plus.Activities activities = createMockPlusActivities(numBefore, 
numAfter, numInRange, after, before);
+    doAnswer(new Answer() {
+      @Override
+      public Plus.Activities answer(InvocationOnMock invocationOnMock) throws 
Throwable {
         return activities;
+      }
+    }).when(plus).activities();
+    return plus;
+  }
+
+  private Plus.Activities createMockPlusActivities(
+      final int numBefore,
+      final int numAfter,
+      final int numInRange,
+      final DateTime after,
+      final DateTime before) {
+    Plus.Activities activities = mock(Plus.Activities.class);
+    try {
+      Plus.Activities.List list = createMockPlusActivitiesList(numBefore, 
numAfter, numInRange, after, before);
+      when(activities.list(anyString(), anyString())).thenReturn(list);
+    } catch (IOException ioe) {
+      fail("Should not have thrown exception while creating mock. : " + 
ioe.getMessage());
     }
+    return activities;
+  }
 
-    private Plus.Activities.List createMockPlusActivitiesList(final int 
numBefore, final int numAfter, final int numInRange, final DateTime after, 
final DateTime before) {
-        Plus.Activities.List list = mock(Plus.Activities.List.class);
-        when(list.setMaxResults(anyLong())).thenReturn(list);
-        when(list.setPageToken(anyString())).thenReturn(list);
-        ActivityFeedAnswer answer = new ActivityFeedAnswer(numBefore, 
numAfter, numInRange, after, before);
-        try {
-            doAnswer(answer).when(list).execute();
-        } catch (IOException ioe) {
-            fail("Should not have thrown exception while creating mock. : 
"+ioe.getMessage());
-        }
-        return list;
+  private Plus.Activities.List createMockPlusActivitiesList(
+      final int numBefore,
+      final int numAfter,
+      final int numInRange,
+      final DateTime after,
+      final DateTime before) {
+    Plus.Activities.List list = mock(Plus.Activities.List.class);
+    when(list.setMaxResults(anyLong())).thenReturn(list);
+    when(list.setPageToken(anyString())).thenReturn(list);
+    ActivityFeedAnswer answer = new ActivityFeedAnswer(numBefore, numAfter, 
numInRange, after, before);
+    try {
+      doAnswer(answer).when(list).execute();
+    } catch (IOException ioe) {
+      fail("Should not have thrown exception while creating mock. : " + 
ioe.getMessage());
     }
+    return list;
+  }
 
 
-    private static ActivityFeed createMockActivityFeed(int numBefore, int 
numAfter, int numInRange,  DateTime after, DateTime before, boolean page) {
-        ActivityFeed feed = new ActivityFeed();
-        List<Activity> list = Lists.newLinkedList();
-        for(int i=0; i < numAfter; ++i) {
-            DateTime published = before.plus(randomIntBetween(0, 
Integer.MAX_VALUE));
-            Activity activity = createActivityWithPublishedDate(published);
-            list.add(activity);
-        }
-        for(int i=0; i < numInRange; ++i) {
-            DateTime published = null;
-            if((before == null && after == null) || before == null) {
-                published = DateTime.now(); // no date range or end time date 
range so just make the time now.
-            } else if(after == null) {
-                published = before.minusMillis(randomIntBetween(1, 
Integer.MAX_VALUE)); //no beginning to range
-            } else { // has to be in range
-                long range = before.getMillis() - after.getMillis();
-                published = after.plus(range / 2); //in the middle
-            }
-            Activity activity = createActivityWithPublishedDate(published);
-            activity.setTitle(IN_RANGE_IDENTIFIER);
-            list.add(activity);
-        }
-        for(int i=0; i < numBefore; ++i) {
-            DateTime published = after.minusMillis(randomIntBetween(1, 
Integer.MAX_VALUE));
-            Activity activity = createActivityWithPublishedDate(published);
-            list.add(activity);
-        }
-        if(page) {
-            feed.setNextPageToken("A");
-        } else {
-            feed.setNextPageToken(null);
-        }
-        feed.setItems(list);
-        return feed;
+  private static ActivityFeed createMockActivityFeed(
+      int numBefore,
+      int numAfter,
+      int numInRange,
+      DateTime after,
+      DateTime before,
+      boolean page) {
+    ActivityFeed feed = new ActivityFeed();
+    List<Activity> list = Lists.newLinkedList();
+    for (int i = 0; i < numAfter; ++i) {
+      DateTime published = before.plus(randomIntBetween(0, Integer.MAX_VALUE));
+      Activity activity = createActivityWithPublishedDate(published);
+      list.add(activity);
     }
-
-    private static Activity createActivityWithPublishedDate(DateTime dateTime) 
{
-        Activity activity = new Activity();
-        activity.setPublished(new 
com.google.api.client.util.DateTime(dateTime.getMillis()));
-        activity.setId("a");
-        return activity;
+    for (int i = 0; i < numInRange; ++i) {
+      DateTime published = null;
+      if ((before == null && after == null) || before == null) {
+        published = DateTime.now(); // no date range or end time date range so 
just make the time now.
+      } else if (after == null) {
+        published = before.minusMillis(randomIntBetween(1, 
Integer.MAX_VALUE)); //no beginning to range
+      } else { // has to be in range
+        long range = before.getMillis() - after.getMillis();
+        published = after.plus(range / 2); //in the middle
+      }
+      Activity activity = createActivityWithPublishedDate(published);
+      activity.setTitle(IN_RANGE_IDENTIFIER);
+      list.add(activity);
     }
+    for (int i = 0; i < numBefore; ++i) {
+      DateTime published = after.minusMillis(randomIntBetween(1, 
Integer.MAX_VALUE));
+      Activity activity = createActivityWithPublishedDate(published);
+      list.add(activity);
+    }
+    if (page) {
+      feed.setNextPageToken("A");
+    } else {
+      feed.setNextPageToken(null);
+    }
+    feed.setItems(list);
+    return feed;
+  }
 
-    private static class ActivityFeedAnswer implements Answer<ActivityFeed> {
-        private int afterCount = 0;
-        private int beforeCount = 0;
-        private int inCount = 0;
-        private int maxBatch = 100;
+  private static Activity createActivityWithPublishedDate(DateTime dateTime) {
+    Activity activity = new Activity();
+    activity.setPublished(new 
com.google.api.client.util.DateTime(dateTime.getMillis()));
+    activity.setId("a");
+    return activity;
+  }
 
-        private int numAfter;
-        private int numInRange;
-        private int numBefore;
-        private DateTime after;
-        private DateTime before;
+  private static class ActivityFeedAnswer implements Answer<ActivityFeed> {
+    private int afterCount = 0;
+    private int beforeCount = 0;
+    private int inCount = 0;
+    private int maxBatch = 100;
 
-        private ActivityFeedAnswer(int numBefore, int numAfter, int 
numInRange, DateTime after, DateTime before) {
-            this.numBefore = numBefore;
-            this.numAfter = numAfter;
-            this.numInRange = numInRange;
-            this.after = after;
-            this.before = before;
-        }
+    private int numAfter;
+    private int numInRange;
+    private int numBefore;
+    private DateTime after;
+    private DateTime before;
 
+    private ActivityFeedAnswer(int numBefore, int numAfter, int numInRange, 
DateTime after, DateTime before) {
+      this.numBefore = numBefore;
+      this.numAfter = numAfter;
+      this.numInRange = numInRange;
+      this.after = after;
+      this.before = before;
+    }
 
 
 
-        @Override
-        public ActivityFeed answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-            int totalCount = 0;
-            int batchAfter = 0;
-            int batchBefore = 0;
-            int batchIn = 0;
-            if(afterCount != numAfter) {
-                if(numAfter - afterCount >= maxBatch) {
-                    afterCount += maxBatch;
-                    batchAfter += maxBatch;
-                    totalCount += batchAfter;
-                } else {
-                    batchAfter += numAfter - afterCount;
-                    totalCount += numAfter - afterCount;
-                    afterCount = numAfter;
-                }
-            }
-            if(totalCount < maxBatch && inCount != numInRange) {
-                if(numInRange - inCount >= maxBatch - totalCount) {
-                    inCount += maxBatch - totalCount;
-                    batchIn += maxBatch - totalCount;
-                    totalCount += batchIn;
-                } else {
-                    batchIn += numInRange - inCount;
-                    totalCount += numInRange - inCount;
-                    inCount = numInRange;
-                }
-            }
-            if(totalCount < maxBatch && beforeCount != numBefore) {
-                if(numBefore - batchBefore >= maxBatch - totalCount) {
-                    batchBefore += maxBatch - totalCount;
-                    totalCount = maxBatch;
-                    beforeCount +=batchBefore;
-                } else {
-                    batchBefore += numBefore - beforeCount;
-                    totalCount += numBefore - beforeCount;
-                    beforeCount = numBefore;
-                }
-            }
 
-            return createMockActivityFeed(batchBefore, batchAfter, batchIn, 
after, before, numAfter != afterCount || inCount != numInRange || beforeCount 
!= numBefore);
+    @Override
+    public ActivityFeed answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+      int totalCount = 0;
+      int batchAfter = 0;
+      int batchBefore = 0;
+      int batchIn = 0;
+      if (afterCount != numAfter) {
+        if (numAfter - afterCount >= maxBatch) {
+          afterCount += maxBatch;
+          batchAfter += maxBatch;
+          totalCount += batchAfter;
+        } else {
+          batchAfter += numAfter - afterCount;
+          totalCount += numAfter - afterCount;
+          afterCount = numAfter;
         }
+      }
+      if (totalCount < maxBatch && inCount != numInRange) {
+        if (numInRange - inCount >= maxBatch - totalCount) {
+          inCount += maxBatch - totalCount;
+          batchIn += maxBatch - totalCount;
+          totalCount += batchIn;
+        } else {
+          batchIn += numInRange - inCount;
+          totalCount += numInRange - inCount;
+          inCount = numInRange;
+        }
+      }
+      if (totalCount < maxBatch && beforeCount != numBefore) {
+        if (numBefore - batchBefore >= maxBatch - totalCount) {
+          batchBefore += maxBatch - totalCount;
+          totalCount = maxBatch;
+          beforeCount += batchBefore;
+        } else {
+          batchBefore += numBefore - beforeCount;
+          totalCount += numBefore - beforeCount;
+          beforeCount = numBefore;
+        }
+      }
+
+      return createMockActivityFeed(
+          batchBefore,
+          batchAfter,
+          batchIn,
+          after,
+          before,
+          numAfter != afterCount || inCount != numInRange || beforeCount != 
numBefore);
     }
+  }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
index 1251b9a..4460fb1 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
@@ -19,13 +19,15 @@
 
 package com.google.gplus.provider;
 
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.services.plus.Plus;
-import com.google.api.services.plus.model.Person;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.configuration.UserInfo;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import 
org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.Person;
+
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -40,110 +42,106 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * Basic Units for {@link com.google.gplus.provider.GPlusUserDataCollector}
+ * Basic Units for {@link com.google.gplus.provider.GPlusUserDataCollector}.
  */
 public class TestGPlusUserDataCollector {
 
-    private static final String NO_ERROR = "no error";
-
-
-    /**
-     * Test that on success a datum will be added to the queue.
-     * @throws Exception
-     */
-    @Test
-    public void testSucessfullPull() throws Exception {
-        Plus plus = createMockPlus(0, null);
-        BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
-        BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
-        UserInfo user = new UserInfo();
-        user.setUserId("A");
-
-        GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, 
backOff, datums, user);
-        collector.run();
-
-        assertEquals(1, datums.size());
-        StreamsDatum datum = datums.take();
-        assertNotNull(datum);
-        assertEquals(NO_ERROR, datum.getId());
-        assertNotNull(datum.getDocument());
-        assertTrue(datum.getDocument() instanceof String);
-    }
-
-    /**
-     * Test that on failure, no datums are output
-     * @throws Exception
-     */
-    @Test
-    public void testFail() throws Exception {
-        Plus plus = createMockPlus(3, mock(GoogleJsonResponseException.class));
-        UserInfo user = new UserInfo();
-        user.setUserId("A");
-        BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
-        BackOffStrategy backOffStrategy = new ConstantTimeBackOffStrategy(1);
-
-        GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, 
backOffStrategy, datums, user);
-        collector.run();
-
-        assertEquals(0, datums.size());
-    }
-
-
-
-    private Plus createMockPlus(final int succedOnTry, final Throwable 
throwable) {
-        Plus plus = mock(Plus.class);
-        doAnswer(new Answer() {
-            @Override
-            public Plus.People answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                return createMockPeople(succedOnTry, throwable);
-            }
-        }).when(plus).people();
-        return plus;
-    }
-
-    private Plus.People createMockPeople(final int succedOnTry, final 
Throwable throwable) {
-        Plus.People people = mock(Plus.People.class);
-        try {
-            when(people.get(anyString())).thenAnswer(new 
Answer<Plus.People.Get>() {
-                @Override
-                public Plus.People.Get answer(InvocationOnMock 
invocationOnMock) throws Throwable {
-                    return createMockGetNoError(succedOnTry, throwable);
-                }
-            });
-        } catch (IOException ioe) {
-            fail("No Excpetion should have been thrown while creating mocks");
+  private static final String NO_ERROR = "no error";
+
+  /**
+   * Test that on success a datum will be added to the queue.
+   * @throws Exception Exception
+   */
+  @Test
+  public void testSucessfullPull() throws Exception {
+    Plus plus = createMockPlus(0, null);
+    BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
+    BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
+    UserInfo user = new UserInfo();
+    user.setUserId("A");
+
+    GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, 
backOff, datums, user);
+    collector.run();
+
+    assertEquals(1, datums.size());
+    StreamsDatum datum = datums.take();
+    assertNotNull(datum);
+    assertEquals(NO_ERROR, datum.getId());
+    assertNotNull(datum.getDocument());
+    assertTrue(datum.getDocument() instanceof String);
+  }
+
+  /**
+   * Test that on failure, no datums are output.
+   * @throws Exception Exception
+   */
+  @Test
+  public void testFail() throws Exception {
+    Plus plus = createMockPlus(3, mock(GoogleJsonResponseException.class));
+    UserInfo user = new UserInfo();
+    user.setUserId("A");
+    BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
+    BackOffStrategy backOffStrategy = new ConstantTimeBackOffStrategy(1);
+
+    GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, 
backOffStrategy, datums, user);
+    collector.run();
+
+    assertEquals(0, datums.size());
+  }
+
+  private Plus createMockPlus(final int succedOnTry, final Throwable 
throwable) {
+    Plus plus = mock(Plus.class);
+    doAnswer(new Answer() {
+      @Override
+      public Plus.People answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+        return createMockPeople(succedOnTry, throwable);
+      }
+    }).when(plus).people();
+    return plus;
+  }
+
+  private Plus.People createMockPeople(final int succedOnTry, final Throwable 
throwable) {
+    Plus.People people = mock(Plus.People.class);
+    try {
+      when(people.get(anyString())).thenAnswer(new Answer<Plus.People.Get>() {
+        @Override
+        public Plus.People.Get answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+          return createMockGetNoError(succedOnTry, throwable);
         }
-        return people;
+      });
+    } catch (IOException ioe) {
+      fail("No Excpetion should have been thrown while creating mocks");
     }
-
-    private Plus.People.Get createMockGetNoError(final int succedOnTry, final 
Throwable throwable) {
-        Plus.People.Get get = mock(Plus.People.Get.class);
-        try {
-            doAnswer(new Answer() {
-                private int counter =0;
-
-                @Override
-                public Person answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                    if(counter == succedOnTry) {
-                        Person p = new Person();
-                        p.setId(NO_ERROR);
-                        return p;
-                    } else {
-                        ++counter;
-                        throw throwable;
-                    }
-                }
-            }).when(get).execute();
-        } catch (IOException ioe) {
-            fail("No Excpetion should have been thrown while creating mocks");
+    return people;
+  }
+
+  private Plus.People.Get createMockGetNoError(final int succedOnTry, final 
Throwable throwable) {
+    Plus.People.Get get = mock(Plus.People.Get.class);
+    try {
+      doAnswer(new Answer() {
+        private int counter = 0;
+
+        @Override
+        public Person answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+          if (counter == succedOnTry) {
+            Person person = new Person();
+            person.setId(NO_ERROR);
+            return person;
+          } else {
+            ++counter;
+            throw throwable;
+          }
         }
-        return get;
+      }).when(get).execute();
+    } catch (IOException ioe) {
+      fail("No Excpetion should have been thrown while creating mocks");
     }
+    return get;
+  }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
index 8b4c29b..96a9d89 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
@@ -18,53 +18,58 @@
 
 package com.google.gplus.serializer.util;
 
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.api.services.plus.model.Activity;
 import com.google.api.services.plus.model.Person;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * GPlusEventClassifierTest tests GPlusEventClassifier.
+ */
 public class GPlusEventClassifierTest {
-    private static StreamsJacksonMapper mapper = 
StreamsJacksonMapper.getInstance();
 
-    @Test
-    public void classifyActivityTest() {
-        try {
-            Activity activity = new Activity();
-            activity.setKind("plus#activity");
-            Class retClass = 
GPlusEventClassifier.detectClass(mapper.writeValueAsString(activity));
+  private static StreamsJacksonMapper mapper = 
StreamsJacksonMapper.getInstance();
+
+  @Test
+  public void classifyActivityTest() {
+    try {
+      Activity activity = new Activity();
+      activity.setKind("plus#activity");
+      Class retClass = 
GPlusEventClassifier.detectClass(mapper.writeValueAsString(activity));
 
-            assertEquals(retClass, Activity.class);
-        } catch(Exception e) {
-            //
-        }
+      assertEquals(retClass, Activity.class);
+    } catch (Exception ex) {
+      //
     }
+  }
 
-    @Test
-    public void classifyPersonTest() {
-        try {
-            Person person = new Person();
-            person.setKind("plus#person");
-            Class retClass = 
GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
+  @Test
+  public void classifyPersonTest() {
+    try {
+      Person person = new Person();
+      person.setKind("plus#person");
+      Class retClass = 
GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
 
-            assertEquals(retClass, Person.class);
-        } catch(Exception e) {
-            //
-        }
+      assertEquals(retClass, Person.class);
+    } catch (Exception ex) {
+      //
     }
+  }
 
-    @Test
-    public void classifObjectNodeTest() {
-        try {
-            Person person = new Person();
-            person.setKind("fake");
-            Class retClass = 
GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
+  @Test
+  public void classifyObjectNodeTest() {
+    try {
+      Person person = new Person();
+      person.setKind("fake");
+      Class retClass = 
GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
 
-            assertEquals(retClass, ObjectNode.class);
-        } catch(Exception e) {
-            //
-        }
+      assertEquals(retClass, ObjectNode.class);
+    } catch (Exception ex) {
+      //
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
index 51caa64..4b642ab 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
@@ -29,39 +29,39 @@ import java.io.LineNumberReader;
 
 public class GPlusUserActivityProviderIT {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserActivityProviderIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserActivityProviderIT.class);
 
-    @Test
-    public void testGPlusUserActivityProvider() throws Exception {
+  @Test
+  public void testGPlusUserActivityProvider() throws Exception {
 
-        String configfile = 
"./target/test-classes/GPlusUserActivityProviderIT.conf";
-        String outfile = 
"./target/test-classes/GPlusUserActivityProviderIT.stdout.txt";
+    String configfile = 
"./target/test-classes/GPlusUserActivityProviderIT.conf";
+    String outfile = 
"./target/test-classes/GPlusUserActivityProviderIT.stdout.txt";
 
-        String[] args = new String[2];
-        args[0] = configfile;
-        args[1] = outfile;
+    String[] args = new String[2];
+    args[0] = configfile;
+    args[1] = outfile;
 
-        Thread testThread = new Thread((Runnable) () -> {
-            try {
-                GPlusUserActivityProvider.main(args);
-            } catch( Exception e ) {
-                LOGGER.error("Test Exception!", e);
-            }
-        });
-        testThread.start();
-        testThread.join(60000);
+    Thread testThread = new Thread((Runnable) () -> {
+      try {
+        GPlusUserActivityProvider.main(args);
+      } catch ( Exception ex ) {
+        LOGGER.error("Test Exception!", ex);
+      }
+    });
+    testThread.start();
+    testThread.join(60000);
 
-        File out = new File(outfile);
-        assert (out.exists());
-        assert (out.canRead());
-        assert (out.isFile());
+    File out = new File(outfile);
+    assert (out.exists());
+    assert (out.canRead());
+    assert (out.isFile());
 
-        FileReader outReader = new FileReader(out);
-        LineNumberReader outCounter = new LineNumberReader(outReader);
+    FileReader outReader = new FileReader(out);
+    LineNumberReader outCounter = new LineNumberReader(outReader);
 
-        while(outCounter.readLine() != null) {}
+    while (outCounter.readLine() != null) {}
 
-        assert (outCounter.getLineNumber() >= 1);
+    assert (outCounter.getLineNumber() >= 1);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
index b367baa..fd4ddd5 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
@@ -30,41 +30,41 @@ import java.io.LineNumberReader;
 
 public class GPlusUserDataProviderIT {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserDataProviderIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserDataProviderIT.class);
 
-    @Test
-    public void testGPlusUserDataProvider() throws Exception {
+  @Test
+  public void testGPlusUserDataProvider() throws Exception {
 
-        String configfile = 
"./target/test-classes/GPlusUserDataProviderIT.conf";
-        String outfile = 
"./target/test-classes/GPlusUserDataProviderIT.stdout.txt";
+    String configfile = "./target/test-classes/GPlusUserDataProviderIT.conf";
+    String outfile = 
"./target/test-classes/GPlusUserDataProviderIT.stdout.txt";
 
-        String[] args = new String[2];
-        args[0] = configfile;
-        args[1] = outfile;
+    String[] args = new String[2];
+    args[0] = configfile;
+    args[1] = outfile;
 
-        Thread testThread = new Thread((Runnable) () -> {
-            try {
-                GPlusUserDataProvider.main(args);
-            } catch( Exception e ) {
-                LOGGER.error("Test Exception!", e);
-            }
-        });
-        testThread.start();
-        testThread.join(60000);
+    Thread testThread = new Thread((Runnable) () -> {
+      try {
+        GPlusUserDataProvider.main(args);
+      } catch ( Exception ex ) {
+        LOGGER.error("Test Exception!", ex);
+      }
+    });
+    testThread.start();
+    testThread.join(60000);
 
-        GPlusUserDataProvider.main(Lists.newArrayList(configfile, 
outfile).toArray(new String[2]));
+    GPlusUserDataProvider.main(Lists.newArrayList(configfile, 
outfile).toArray(new String[2]));
 
-        File out = new File(outfile);
-        assert (out.exists());
-        assert (out.canRead());
-        assert (out.isFile());
+    File out = new File(outfile);
+    assert (out.exists());
+    assert (out.canRead());
+    assert (out.isFile());
 
-        FileReader outReader = new FileReader(out);
-        LineNumberReader outCounter = new LineNumberReader(outReader);
+    FileReader outReader = new FileReader(out);
+    LineNumberReader outCounter = new LineNumberReader(outReader);
 
-        while(outCounter.readLine() != null) {}
+    while (outCounter.readLine() != null) {}
 
-        assert (outCounter.getLineNumber() >= 1);
+    assert (outCounter.getLineNumber() >= 1);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
index 17af5f6..6fd6b4e 100644
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -18,88 +18,91 @@
 
 package org.apache.streams.instagram.processor;
 
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.instagram.serializer.InstagramMediaFeedDataConverter;
 import org.apache.streams.instagram.serializer.InstagramUserInfoDataConverter;
-import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
+
+import com.google.common.collect.Lists;
 import org.jinstagram.entity.users.basicinfo.UserInfoData;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Queue;
 
 /**
- * This is deprecated - use ActivityConverterProcessor or 
ActivityObjectConverterProcessor
+ * This is deprecated - use ActivityConverterProcessor or 
ActivityObjectConverterProcessor.
  */
 @Deprecated
 public class InstagramTypeConverter implements StreamsProcessor {
 
-    public final static String STREAMS_ID = "InstagramTypeConverter";
+  public static final String STREAMS_ID = "InstagramTypeConverter";
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(InstagramTypeConverter.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramTypeConverter.class);
 
-    private InstagramMediaFeedDataConverter mediaFeedDataConverter;
-    private InstagramUserInfoDataConverter userInfoDataConverter;
+  private InstagramMediaFeedDataConverter mediaFeedDataConverter;
+  private InstagramUserInfoDataConverter userInfoDataConverter;
 
-    public final static String TERMINATE = new String("TERMINATE");
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public static final String TERMINATE = new String("TERMINATE");
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        StreamsDatum result = null;
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        try {
-            Object item = entry.getDocument();
+    StreamsDatum result = null;
 
-            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-            if(item instanceof MediaFeedData) {
+    try {
+      Object item = entry.getDocument();
 
-                //We don't need to use the mapper, since we have a process to 
convert between
-                //MediaFeedData objects and Activity objects already
-                List<Activity> activity = 
mediaFeedDataConverter.toActivityList((MediaFeedData)item);
+      LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+      if (item instanceof MediaFeedData) {
 
-                if( activity.size() > 0 ) result = new StreamsDatum(activity);
+        //We don't need to use the mapper, since we have a process to convert 
between
+        //MediaFeedData objects and Activity objects already
+        List<Activity> activity = 
mediaFeedDataConverter.toActivityList((MediaFeedData)item);
 
-            } else if(item instanceof UserInfoData) {
-
-                ActivityObject activityObject = 
userInfoDataConverter.toActivityObject((UserInfoData)item);
+        if ( activity.size() > 0 ) {
+          result = new StreamsDatum(activity);
+        }
 
-                if( activityObject != null ) result = new 
StreamsDatum(activityObject);
+      } else if (item instanceof UserInfoData) {
 
-            }
+        ActivityObject activityObject = 
userInfoDataConverter.toActivityObject((UserInfoData)item);
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.error("Exception while converting item: {}", 
e.getMessage());
+        if ( activityObject != null ) {
+          result = new StreamsDatum(activityObject);
         }
 
-        if( result != null ) {
-            return Lists.newArrayList(result);
-        } else
-            return Lists.newArrayList();
-
-    }
+      }
 
-    @Override
-    public void prepare(Object o) {
-        mediaFeedDataConverter = new InstagramMediaFeedDataConverter();
-        userInfoDataConverter = new InstagramUserInfoDataConverter();
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      LOGGER.error("Exception while converting item: {}", ex.getMessage());
     }
 
-    @Override
-    public void cleanUp() {
-        //noop
+    if ( result != null ) {
+      return Lists.newArrayList(result);
+    } else {
+      return Lists.newArrayList();
     }
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    mediaFeedDataConverter = new InstagramMediaFeedDataConverter();
+    userInfoDataConverter = new InstagramUserInfoDataConverter();
+  }
+
+  @Override
+  public void cleanUp() {
+    //noop
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
index 0c7ba95..025af18 100644
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -12,12 +12,9 @@ software distributed under the License is distributed on an
 KIND, either express or implied. See the License for the
 specific language governing permissions and limitations
 under the License. */
+
 package org.apache.streams.instagram.provider;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
@@ -28,6 +25,11 @@ import org.apache.streams.instagram.User;
 import org.apache.streams.instagram.UsersInfo;
 import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.SerializationUtil;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,178 +55,172 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public abstract class InstagramAbstractProvider implements StreamsProvider {
 
-    public static final String STREAMS_ID = "InstagramAbstractProvider";
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramAbstractProvider.class);
-
-    private static final int MAX_BATCH_SIZE = 2000;
-
-    protected InstagramConfiguration config;
-    protected Queue<StreamsDatum> dataQueue;
-    private ListeningExecutorService executorService;
-
-    private List<ListenableFuture<Object>> futures = new ArrayList<>();
-
-    private AtomicBoolean isCompleted;
-
-    public InstagramAbstractProvider() {
-        this.config = new ComponentConfigurator<>(InstagramConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("instagram"));
-    }
-
-    public InstagramAbstractProvider(InstagramConfiguration config) {
-        this.config = SerializationUtil.cloneBySerialization(config);
-    }
-
-    public static ExecutorService newFixedThreadPoolWithQueueSize(int 
nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public void startStream() {
-        InstagramDataCollector dataCollector = getInstagramDataCollector();
-        this.executorService = 
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
-        ListenableFuture future = this.executorService.submit(dataCollector);
-        this.futures.add(future);
-        executorService.shutdown();
-    }
-
-    /**
-     * Return the data collector to use to connect to instagram.
-     * @return {@link InstagramDataCollector}
-     */
-    protected abstract InstagramDataCollector getInstagramDataCollector();
-
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>();
-        int count = 0;
-        while(!this.dataQueue.isEmpty() && count < MAX_BATCH_SIZE) {
-            
ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue),
 batch);
-            ++count;
-        }
-        return new StreamsResultSet(batch);
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.dataQueue = new ConcurrentLinkedQueue<>();
-        this.isCompleted = new AtomicBoolean(false);
-    }
-
-    @Override
-    public void cleanUp() {
-        try {
-            ComponentUtils.shutdownExecutor(this.executorService, 5, 5);
-        } finally {
-            this.executorService = null;
-        }
-    }
-
-    /**
-     * Add default start and stop points if necessary.
-     */
-    private void updateUserInfoList() {
-        UsersInfo usersInfo = this.config.getUsersInfo();
-        if(usersInfo.getDefaultAfterDate() == null && 
usersInfo.getDefaultBeforeDate() == null) {
-            return;
-        }
-        DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
-        DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
-        for(User user : usersInfo.getUsers()) {
-            if(defaultAfterDate != null && user.getAfterDate() == null) {
-                user.setAfterDate(defaultAfterDate);
-            }
-            if(defaultBeforeDate != null && user.getBeforeDate() == null) {
-                user.setBeforeDate(defaultBeforeDate);
-            }
-        }
-    }
-
-    /**
-     * Overrides the client id in the configuration.
-     * @param clientId client id to use
-     */
-    public void setInstagramClientId(String clientId) {
-        this.config.setClientId(clientId);
-    }
-
-    /**
-     * Overrides authroized user tokens in the configuration.
-     * @param tokenStrings
-     */
-    public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
-        ensureUsersInfo(this.config).setAuthorizedTokens(new 
HashSet<>(tokenStrings));
-    }
-
-    /**
-     * Overrides the default before date in the configuration
-     * @param beforeDate
-     */
-    public void setDefaultBeforeDate(DateTime beforeDate) {
-        ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
-    }
-
-    /**
-     * Overrides the default after date in the configuration
-     * @param afterDate
-     */
-    public void setDefaultAfterDate(DateTime afterDate) {
-        ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
-    }
-
-    /**
-     * Overrides the users in the configuration and sets the after date for 
each user. A NULL DateTime implies
-     * pull data from as early as possible.  If default before or after 
DateTimes are set, they will applied to all
-     * NULL DateTimes.
-     * @param usersWithAfterDate instagram user id mapped to BeforeDate time
-     */
-    public void setUsersWithAfterDate(Map<String, DateTime> 
usersWithAfterDate) {
-        Set<User> users = new HashSet<>();
-        for(String userId : usersWithAfterDate.keySet()) {
-            User user = new User();
-            user.setUserId(userId);
-            user.setAfterDate(usersWithAfterDate.get(userId));
-            users.add(user);
-        }
-        ensureUsersInfo(this.config).setUsers(users);
-    }
-
-    private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
-        UsersInfo usersInfo = config.getUsersInfo();
-        if(usersInfo == null) {
-            usersInfo = new UsersInfo();
-            config.setUsersInfo(usersInfo);
-        }
-        return usersInfo;
-    }
-
-    @Override
-    public boolean isRunning() {
-        if (dataQueue.isEmpty() && executorService.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-            LOGGER.info("Completed");
-            isCompleted.set(true);
-            LOGGER.info("Exiting");
-        }
-        return !isCompleted.get();
-    }
+  public static final String STREAMS_ID = "InstagramAbstractProvider";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramAbstractProvider.class);
+
+  private static final int MAX_BATCH_SIZE = 2000;
+
+  protected InstagramConfiguration config;
+  protected Queue<StreamsDatum> dataQueue;
+  private ListeningExecutorService executorService;
+
+  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+  private AtomicBoolean isCompleted;
+
+  public InstagramAbstractProvider() {
+    this.config = new ComponentConfigurator<>(InstagramConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("instagram"));
+  }
+
+  public InstagramAbstractProvider(InstagramConfiguration config) {
+    this.config = SerializationUtil.cloneBySerialization(config);
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void startStream() {
+    InstagramDataCollector dataCollector = getInstagramDataCollector();
+    this.executorService = 
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+    ListenableFuture future = this.executorService.submit(dataCollector);
+    this.futures.add(future);
+    executorService.shutdown();
+  }
+
+  /**
+   * Return the data collector to use to connect to instagram.
+   * @return {@link InstagramDataCollector}
+   */
+  protected abstract InstagramDataCollector getInstagramDataCollector();
+
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>();
+    int count = 0;
+    while (!this.dataQueue.isEmpty() && count < MAX_BATCH_SIZE) {
+      
ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue),
 batch);
+      ++count;
+    }
+    return new StreamsResultSet(batch);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.dataQueue = new ConcurrentLinkedQueue<>();
+    this.isCompleted = new AtomicBoolean(false);
+  }
+
+  @Override
+  public void cleanUp() {
+    try {
+      ComponentUtils.shutdownExecutor(this.executorService, 5, 5);
+    } finally {
+      this.executorService = null;
+    }
+  }
+
+  /**
+   * Add default start and stop points if necessary.
+   */
+  private void updateUserInfoList() {
+    UsersInfo usersInfo = this.config.getUsersInfo();
+    if (usersInfo.getDefaultAfterDate() == null && 
usersInfo.getDefaultBeforeDate() == null) {
+      return;
+    }
+    DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
+    DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
+    for (User user : usersInfo.getUsers()) {
+      if (defaultAfterDate != null && user.getAfterDate() == null) {
+        user.setAfterDate(defaultAfterDate);
+      }
+      if (defaultBeforeDate != null && user.getBeforeDate() == null) {
+        user.setBeforeDate(defaultBeforeDate);
+      }
+    }
+  }
+
+  /**
+   * Overrides the client id in the configuration.
+   * @param clientId client id to use
+   */
+  public void setInstagramClientId(String clientId) {
+    this.config.setClientId(clientId);
+  }
+
+  /**
+   * Overrides authroized user tokens in the configuration.
+   * @param tokenStrings tokenStrings
+   */
+  public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
+    ensureUsersInfo(this.config).setAuthorizedTokens(new 
HashSet<>(tokenStrings));
+  }
+
+  /**
+   * Overrides the default before date in the configuration.
+   * @param beforeDate beforeDate
+   */
+  public void setDefaultBeforeDate(DateTime beforeDate) {
+    ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
+  }
+
+  /**
+   * Overrides the default after date in the configuration.
+   * @param afterDate afterDate
+   */
+  public void setDefaultAfterDate(DateTime afterDate) {
+    ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
+  }
+
+  /**
+   * Overrides the users in the configuration and sets the after date for each 
user. A NULL DateTime implies
+   * pull data from as early as possible.  If default before or after 
DateTimes are set, they will applied to all
+   * NULL DateTimes.
+   * @param usersWithAfterDate instagram user id mapped to BeforeDate time
+   */
+  public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
+    Set<User> users = new HashSet<>();
+    for (String userId : usersWithAfterDate.keySet()) {
+      User user = new User();
+      user.setUserId(userId);
+      user.setAfterDate(usersWithAfterDate.get(userId));
+      users.add(user);
+    }
+    ensureUsersInfo(this.config).setUsers(users);
+  }
+
+  private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
+    UsersInfo usersInfo = config.getUsersInfo();
+    if (usersInfo == null) {
+      usersInfo = new UsersInfo();
+      config.setUsersInfo(usersInfo);
+    }
+    return usersInfo;
+  }
+
+  @Override
+  public boolean isRunning() {
+    if (dataQueue.isEmpty() && executorService.isTerminated() && 
Futures.allAsList(futures).isDone()) {
+      LOGGER.info("Completed");
+      isCompleted.set(true);
+      LOGGER.info("Exiting");
+    }
+    return !isCompleted.get();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
index 97451f0..1916061 100644
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
@@ -12,9 +12,9 @@ software distributed under the License is distributed on an
 KIND, either express or implied. See the License for the
 specific language governing permissions and limitations
 under the License. */
+
 package org.apache.streams.instagram.provider;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.User;
@@ -22,13 +22,9 @@ import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
 import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
-import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import 
org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager;
+
 import org.jinstagram.Instagram;
-import org.jinstagram.entity.common.Pagination;
-import org.jinstagram.entity.users.feed.MediaFeed;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.jinstagram.exceptions.InstagramBadRequestException;
-import org.jinstagram.exceptions.InstagramRateLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,102 +40,106 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public abstract class InstagramDataCollector<T> implements Runnable {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramDataCollector.class);
-
-    protected Queue<StreamsDatum> dataQueue; //exposed for testing
-    private InstagramConfiguration config;
-    private AtomicBoolean isCompleted;
-    private SimpleTokenManager<InstagramOauthToken> tokenManger;
-    protected int consecutiveErrorCount;
-    protected BackOffStrategy backOffStrategy;
-    private Instagram instagram;
-
-
-    public InstagramDataCollector(Queue<StreamsDatum> queue, 
InstagramConfiguration config) {
-        this.dataQueue = queue;
-        this.config = config;
-        this.isCompleted = new AtomicBoolean(false);
-        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
-        for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) 
{
-            this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
-        }
-        this.consecutiveErrorCount = 0;
-        this.backOffStrategy = new ExponentialBackOffStrategy(2);
-        this.instagram = new Instagram(this.config.getClientId());
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramDataCollector.class);
+
+  protected Queue<StreamsDatum> dataQueue; //exposed for testing
+  private InstagramConfiguration config;
+  private AtomicBoolean isCompleted;
+  private SimpleTokenManager<InstagramOauthToken> tokenManger;
+  protected int consecutiveErrorCount;
+  protected BackOffStrategy backOffStrategy;
+  private Instagram instagram;
+
+  /**
+   * InstagramDataCollector constructor.
+   * @param queue Queue of StreamsDatum
+   * @param config InstagramConfiguration
+   */
+  public InstagramDataCollector(Queue<StreamsDatum> queue, 
InstagramConfiguration config) {
+    this.dataQueue = queue;
+    this.config = config;
+    this.isCompleted = new AtomicBoolean(false);
+    this.tokenManger = new BasicTokenManager<InstagramOauthToken>();
+    for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
+      this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
     }
-
-
-    /**
-     * If there are authorized tokens available, it sets a new token for the 
client and returns
-     * the client.  If there are no available tokens, it simply returns the 
client that was
-     * initialized in the constructor with client id.
-     * @return
-     */
-    protected Instagram getNextInstagramClient() {
-        if(this.tokenManger.numAvailableTokens() > 0) {
-            
this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
-        }
-        return this.instagram;
+    this.consecutiveErrorCount = 0;
+    this.backOffStrategy = new ExponentialBackOffStrategy(2);
+    this.instagram = new Instagram(this.config.getClientId());
+  }
+
+
+  /**
+   * If there are authorized tokens available, it sets a new token for the 
client and returns
+   * the client.  If there are no available tokens, it simply returns the 
client that was
+   * initialized in the constructor with client id.
+   * @return result
+   */
+  protected Instagram getNextInstagramClient() {
+    if (this.tokenManger.numAvailableTokens() > 0) {
+      this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
     }
-
-    /**
-     * Return the number of available tokens for this data collector
-     * @return numbeer of available tokens
-     */
-    protected int numAvailableTokens() {
-        return this.tokenManger.numAvailableTokens();
+    return this.instagram;
+  }
+
+  /**
+   * Return the number of available tokens for this data collector.
+   * @return numbeer of available tokens
+   */
+  protected int numAvailableTokens() {
+    return this.tokenManger.numAvailableTokens();
+  }
+
+  /**
+   * Queues the Instagram data to be output by the provider.
+   * @param userData data to queue
+   * @param userId user id who the data came from
+   */
+  protected void queueData(Collection<T> userData, String userId) {
+    if (userData == null) {
+      LOGGER.warn("User id, {}, returned a NULL data from instagram.", userId);
+    } else {
+      for (T data : userData) {
+        ComponentUtils.offerUntilSuccess(convertToStreamsDatum(data), 
this.dataQueue);
+      }
     }
-
-    /**
-     * Queues the Instagram data to be output by the provider.
-     * @param userData data to queue
-     * @param userId user id who the data came from
-     */
-    protected void queueData(Collection<T> userData, String userId) {
-        if (userData == null) {
-            LOGGER.warn("User id, {}, returned a NULL data from instagram.", 
userId);
-        } else {
-            for (T data : userData) {
-                ComponentUtils.offerUntilSuccess(convertToStreamsDatum(data), 
this.dataQueue);
-            }
-        }
+  }
+
+  /**
+   * @return true when the collector has queued all of the available Instagram 
data for the provided users.
+   */
+  public boolean isCompleted() {
+    return this.isCompleted.get();
+  }
+
+  @Override
+  public void run() {
+    for (User user : this.config.getUsersInfo().getUsers()) {
+      try {
+        collectInstagramDataForUser(user);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      } catch (Exception ex) {
+        LOGGER.error("Exception thrown while polling for user, {}, skipping 
user.", user.getUserId());
+        LOGGER.error("Exception thrown while polling for user : ", ex);
+      }
     }
-
-    /**
-     * @return true when the collector has queued all of the available 
Instagram data for the provided users.
-     */
-    public boolean isCompleted() {
-        return this.isCompleted.get();
-    }
-
-    @Override
-    public void run() {
-        for (User user : this.config.getUsersInfo().getUsers()) {
-            try {
-                collectInstagramDataForUser(user);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                LOGGER.error("Exception thrown while polling for user, {}, 
skipping user.", user.getUserId());
-                LOGGER.error("Exception thrown while polling for user : ", e);
-            }
-        }
-        this.isCompleted.set(true);
-    }
-
-    /**
-     * Pull instagram data for a user and queues the resulting data.
-     * @param user
-     * @throws Exception
-     */
-    protected abstract void collectInstagramDataForUser(User user) throws 
Exception;
-
-    /**
-     * Takes an Instagram Object and sets it as the document of a streams 
datum and sets the id of the streams datum.
-     * @param item
-     * @return
-     */
-    protected abstract StreamsDatum convertToStreamsDatum(T item);
+    this.isCompleted.set(true);
+  }
+
+  /**
+   * Pull instagram data for a user and queues the resulting data.
+   * @param user
+   * @throws Exception
+   */
+  protected abstract void collectInstagramDataForUser(User user) throws 
Exception;
+
+  /**
+   * Takes an Instagram Object and sets it as the document of a streams datum 
and sets the id of the streams datum.
+   * @param item
+   * @return
+   */
+  protected abstract StreamsDatum convertToStreamsDatum(T item);
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
index 4531cfe..959b240 100644
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
@@ -12,8 +12,8 @@ software distributed under the License is distributed on an
 KIND, either express or implied. See the License for the
 specific language governing permissions and limitations
 under the License. */
-package org.apache.streams.instagram.provider;
 
+package org.apache.streams.instagram.provider;
 
 import org.jinstagram.auth.model.Token;
 
@@ -23,21 +23,21 @@ import org.jinstagram.auth.model.Token;
  */
 public class InstagramOauthToken extends Token {
 
-    public InstagramOauthToken(String token) {
-        this(token, null);
-    }
+  public InstagramOauthToken(String token) {
+    this(token, null);
+  }
 
-    public InstagramOauthToken(String token, String secret) {
-        super(token, secret);
-    }
+  public InstagramOauthToken(String token, String secret) {
+    super(token, secret);
+  }
 
-    @Override
-    public boolean equals(Object o) {
-        if(!(o instanceof InstagramOauthToken)) {
-            return false;
-        }
-        InstagramOauthToken that = (InstagramOauthToken) o;
-        return this.getToken().equals(that.getToken());
+  @Override
+  public boolean equals(Object object) {
+    if (!(object instanceof InstagramOauthToken)) {
+      return false;
     }
+    InstagramOauthToken that = (InstagramOauthToken) object;
+    return this.getToken().equals(that.getToken());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
index e946e6b..b1e4593 100644
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
@@ -12,13 +12,14 @@ software distributed under the License is distributed on an
 KIND, either express or implied. See the License for the
 specific language governing permissions and limitations
 under the License. */
+
 package org.apache.streams.instagram.provider.recentmedia;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.User;
 import org.apache.streams.instagram.provider.InstagramDataCollector;
+
 import org.jinstagram.entity.common.Pagination;
 import org.jinstagram.entity.users.feed.MediaFeed;
 import org.jinstagram.entity.users.feed.MediaFeedData;
@@ -37,75 +38,77 @@ import java.util.Queue;
  */
 public class InstagramRecentMediaCollector extends 
InstagramDataCollector<MediaFeedData> {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
-    protected static final int MAX_ATTEMPTS = 5;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
+  protected static final int MAX_ATTEMPTS = 5;
 
-    private int consecutiveErrorCount;
+  private int consecutiveErrorCount;
 
 
-    public InstagramRecentMediaCollector(Queue<StreamsDatum> queue, 
InstagramConfiguration config) {
-        super(queue, config);
-    }
+  public InstagramRecentMediaCollector(Queue<StreamsDatum> queue, 
InstagramConfiguration config) {
+    super(queue, config);
+  }
 
-    @Override
-    protected StreamsDatum convertToStreamsDatum(MediaFeedData item) {
-        return new StreamsDatum(item, item.getId());
-    }
+  @Override
+  protected StreamsDatum convertToStreamsDatum(MediaFeedData item) {
+    return new StreamsDatum(item, item.getId());
+  }
 
-    /**
-     * Pull Recement Media for a user and queues the resulting data. Will try 
a single call 5 times before failing and
-     * moving on to the next call or returning.
-     * @param user
-     * @throws Exception
-     */
-    @Override
-    protected void collectInstagramDataForUser(User user) throws Exception {
-        Pagination pagination = null;
-        do {
-            int attempts = 0;
-            boolean succesfullDataPull = false;
-            while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
-                ++attempts;
-                MediaFeed feed = null;
-                try {
-                    if (pagination == null) {
-                        feed = 
getNextInstagramClient().getRecentMediaFeed(user.getUserId(),
-                                0,
-                                null,
-                                null,
-                                user.getBeforeDate() == null ? null : 
user.getBeforeDate().toDate(),
-                                user.getAfterDate() == null ? null : 
user.getAfterDate().toDate());
-                    } else {
-                        feed = 
getNextInstagramClient().getRecentMediaNextPage(pagination);
-                    }
-                } catch (Exception e) {
-                    if(e instanceof InstagramRateLimitException) {
-                        LOGGER.warn("Received rate limit exception from 
Instagram, backing off. : {}", e);
-                        this.backOffStrategy.backOff();
-                    } else if(e instanceof InstagramBadRequestException) {
-                        LOGGER.error("Received Bad Requests exception form 
Instagram: {}", e);
-                        attempts = MAX_ATTEMPTS; //don't repeat bad requests.
-                        ++this.consecutiveErrorCount;
-                    } else {
-                        LOGGER.error("Received Expection while attempting to 
poll Instagram: {}", e);
-                        ++this.consecutiveErrorCount;
-                    }
-                    if(this.consecutiveErrorCount > 
Math.max(this.numAvailableTokens(), MAX_ATTEMPTS*2)) {
-                        throw new Exception("InstagramCollector failed to 
successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
-                    }
-                }
-                if(succesfullDataPull = feed != null) {
-                    this.consecutiveErrorCount = 0;
-                    this.backOffStrategy.reset();
-                    pagination = feed.getPagination();
-                    queueData(feed.getData(), user.getUserId());
-                }
-            }
-            if(!succesfullDataPull) {
-                LOGGER.error("Failed to get data from instagram for user id, 
{}, skipping user.", user.getUserId());
-            }
-        } while (pagination != null && pagination.hasNextPage());
+  /**
+   * Pull Recement Media for a user and queues the resulting data. Will try a 
single call 5 times before failing and
+   * moving on to the next call or returning.
+   * @param user user
+   * @throws Exception Exception
+   */
+  @Override
+  protected void collectInstagramDataForUser(User user) throws Exception {
+    Pagination pagination = null;
+    do {
+      int attempts = 0;
+      boolean succesfullDataPull = false;
+      while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
+        ++attempts;
+        MediaFeed feed = null;
+        try {
+          if (pagination == null) {
+            feed = 
getNextInstagramClient().getRecentMediaFeed(user.getUserId(),
+                0,
+                null,
+                null,
+                user.getBeforeDate() == null ? null : 
user.getBeforeDate().toDate(),
+                user.getAfterDate() == null ? null : 
user.getAfterDate().toDate());
+          } else {
+            feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
+          }
+        } catch (Exception ex) {
+          if ( ex instanceof InstagramRateLimitException) {
+            LOGGER.warn("Received rate limit exception from Instagram, backing 
off. : {}", ex);
+            this.backOffStrategy.backOff();
+          } else if ( ex instanceof InstagramBadRequestException) {
+            LOGGER.error("Received Bad Requests exception form Instagram: {}", 
ex);
+            attempts = MAX_ATTEMPTS; //don't repeat bad requests.
+            ++this.consecutiveErrorCount;
+          } else {
+            LOGGER.error("Received Expection while attempting to poll 
Instagram: {}", ex);
+            ++this.consecutiveErrorCount;
+          }
+          if (this.consecutiveErrorCount > Math.max(this.numAvailableTokens(), 
MAX_ATTEMPTS * 2)) {
+            throw new Exception(
+                "InstagramCollector failed to successfully connect to 
instagram on " + this.consecutiveErrorCount + " attempts.");
+          }
+        }
+        if (succesfullDataPull = feed != null) {
+          this.consecutiveErrorCount = 0;
+          this.backOffStrategy.reset();
+          pagination = feed.getPagination();
+          queueData(feed.getData(), user.getUserId());
+        }
+      }
+      if (!succesfullDataPull) {
+        LOGGER.error("Failed to get data from instagram for user id, {}, 
skipping user.", user.getUserId());
+      }
     }
+    while (pagination != null && pagination.hasNextPage());
+  }
 
 
 }

Reply via email to