http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java index 677b22f..3620346 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java @@ -19,6 +19,12 @@ package com.google.gplus.provider; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy; + import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -29,11 +35,7 @@ import com.google.api.services.plus.model.Activity; import com.google.api.services.plus.model.ActivityFeed; import com.google.common.collect.Lists; import com.google.gplus.serializer.util.GPlusActivityDeserializer; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; -import org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy; + import org.joda.time.DateTime; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -55,231 +57,256 @@ import static org.mockito.Mockito.when; */ public class TestGPlusUserActivityCollector extends RandomizedTest { + private static final String ACTIVITY_TEMPLATE = "{ \"kind\": \"plus#activity\", \"etag\": \"\\\"Vea_b94Y77GDGgRK7gFNPnolKQw/v1-6aVSBGT4qiStMoz7f2_AN2fM\\\"\", \"title\": \"\", \"published\": \"%s\", \"updated\": \"2014-10-27T06:26:33.927Z\", \"id\": \"z13twrlznpvtzz52w22mdt1y0k3of1djw04\", \"url\": \"https://plus.google.com/116771159471120611293/posts/GR7CGR8N5VL\", \"actor\": { \"id\": \"116771159471120611293\", \"displayName\": \"displayName\", \"url\": \"https://plus.google.com/116771159471120611293\", \"image\": { \"url\": \"https://lh6.googleusercontent.com/-C0fiZBxdvw0/AAAAAAAAAAI/AAAAAAAAJ5k/K4pgR3_-_ms/photo.jpg?sz=50\" } }, \"verb\": \"share\", \"object\": { \"objectType\": \"activity\", \"id\": \"z13zgvtiurjgfti1v234iflghvq2c1dge04\", \"actor\": { \"id\": \"104954254300557350002\", \"displayName\": \"displayName\", \"url\": \"https://plus.google.com/104954254300557350002\", \"image\": { \"url\": \"https://lh4.googleusercontent.com/-SO1scj4p2LA/AAAAAAAAAAI/AAAAAAAAI-s/efA 9LBVe144/photo.jpg?sz=50\" } }, \"content\": \"\", \"url\": \"https://plus.google.com/104954254300557350002/posts/AwewXhtn7ws\", \"replies\": { \"totalItems\": 0, \"selfLink\": \"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/comments\" }, \"plusoners\": { \"totalItems\": 9, \"selfLink\": \"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/plusoners\" }, \"resharers\": { \"totalItems\": 0, \"selfLink\": \"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/resharers\" }, \"attachments\": [ { \"objectType\": \"photo\", \"id\": \"104954254300557350002.6074732746360957410\", \"content\": \"26/10/2014 - 1\", \"url\": \"https://plus.google.com/photos/104954254300557350002/albums/6074732747132702225/6074732746360957410\", \"image\": { \"url\": \"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w506-h750/2014%2B-%2B1\", \"type\": \"ima ge/jpeg\" }, \"fullImage\": { \"url\": \"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w600-h1141/2014%2B-%2B1\", \"type\": \"image/jpeg\", \"height\": 1141, \"width\": 600 } } ] }, \"annotation\": \"Truth ð\", \"provider\": { \"title\": \"Reshared Post\" }, \"access\": { \"kind\": \"plus#acl\", \"description\": \"Public\", \"items\": [ { \"type\": \"public\" } ] } }"; + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final String IN_RANGE_IDENTIFIER = "data in range"; - private static final String ACTIVITY_TEMPLATE = "{ \"kind\": \"plus#activity\", \"etag\": \"\\\"Vea_b94Y77GDGgRK7gFNPnolKQw/v1-6aVSBGT4qiStMoz7f2_AN2fM\\\"\", \"title\": \"\", \"published\": \"%s\", \"updated\": \"2014-10-27T06:26:33.927Z\", \"id\": \"z13twrlznpvtzz52w22mdt1y0k3of1djw04\", \"url\": \"https://plus.google.com/116771159471120611293/posts/GR7CGR8N5VL\", \"actor\": { \"id\": \"116771159471120611293\", \"displayName\": \"displayName\", \"url\": \"https://plus.google.com/116771159471120611293\", \"image\": { \"url\": \"https://lh6.googleusercontent.com/-C0fiZBxdvw0/AAAAAAAAAAI/AAAAAAAAJ5k/K4pgR3_-_ms/photo.jpg?sz=50\" } }, \"verb\": \"share\", \"object\": { \"objectType\": \"activity\", \"id\": \"z13zgvtiurjgfti1v234iflghvq2c1dge04\", \"actor\": { \"id\": \"104954254300557350002\", \"displayName\": \"displayName\", \"url\": \"https://plus.google.com/104954254300557350002\", \"image\": { \"url\": \"https://lh4.googleusercontent.com/-SO1scj4p2LA/AAAAAAAAAAI/AAAAAAAAI-s/e fA9LBVe144/photo.jpg?sz=50\" } }, \"content\": \"\", \"url\": \"https://plus.google.com/104954254300557350002/posts/AwewXhtn7ws\", \"replies\": { \"totalItems\": 0, \"selfLink\": \"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/comments\" }, \"plusoners\": { \"totalItems\": 9, \"selfLink\": \"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/plusoners\" }, \"resharers\": { \"totalItems\": 0, \"selfLink\": \"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/resharers\" }, \"attachments\": [ { \"objectType\": \"photo\", \"id\": \"104954254300557350002.6074732746360957410\", \"content\": \"26/10/2014 - 1\", \"url\": \"https://plus.google.com/photos/104954254300557350002/albums/6074732747132702225/6074732746360957410\", \"image\": { \"url\": \"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w506-h750/2014%2B-%2B1\", \"type\": \"i mage/jpeg\" }, \"fullImage\": { \"url\": \"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w600-h1141/2014%2B-%2B1\", \"type\": \"image/jpeg\", \"height\": 1141, \"width\": 600 } } ] }, \"annotation\": \"Truth ð\", \"provider\": { \"title\": \"Reshared Post\" }, \"access\": { \"kind\": \"plus#acl\", \"description\": \"Public\", \"items\": [ { \"type\": \"public\" } ] } }"; - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private static final String IN_RANGE_IDENTIFIER = "data in range"; - + static { + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer()); + MAPPER.registerModule(simpleModule); + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } - static { - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer()); - MAPPER.registerModule(simpleModule); - MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + /** + * Creates a randomized activity and randomized date range. + * + * <p/> + * The activity feed is separated into three chunks, + * |. . . data too recent to be in date range . . .||. . . data in date range. . .||. . . data too old to be in date range| + * [index 0, ............................................................................................., index length-1] + * + * <p/> + * Inside of those chunks data has no order, but the list is ordered by those three chunks. + * + * <p/> + * The test will check to see if the num of data in the date range make onto the output queue. + */ + @Test + @Repeat(iterations = 3) + public void testWithBeforeAndAfterDates() throws InterruptedException { + //initialize counts assuming no date ranges will be used + int numActivities = randomIntBetween(0, 1000); + int numActivitiesInDateRange = numActivities; + int numberOutOfRange = 0; + int numBeforeRange = 0; + int numAfterRange = 0; + //determine if date ranges will be used + DateTime beforeDate = null; + DateTime afterDate = null; + if (randomInt() % 2 == 0) { + beforeDate = DateTime.now().minusDays(randomIntBetween(1,5)); } - - /** - * Creates a randomized activity and randomized date range. - * The activity feed is separated into three chunks, - * |. . . data too recent to be in date range . . .||. . . data in date range. . .||. . . data too old to be in date range| - * [index 0, ............................................................................................., index length-1] - * Inside of those chunks data has no order, but the list is ordered by those three chunks. - * - * The test will check to see if the num of data in the date range make onto the output queue. - */ - @Test - @Repeat(iterations = 3) - public void testWithBeforeAndAfterDates() throws InterruptedException { - //initialize counts assuming no date ranges will be used - int numActivities = randomIntBetween(0, 1000); - int numActivitiesInDateRange = numActivities; - int numberOutOfRange = 0; - int numBerforeRange = 0; - int numAfterRange = 0; - //determine if date ranges will be used - DateTime beforeDate = null; - DateTime afterDate = null; - if(randomInt() % 2 == 0) { - beforeDate = DateTime.now().minusDays(randomIntBetween(1,5)); - } - if(randomInt() % 2 == 0) { - if(beforeDate == null) { - afterDate = DateTime.now().minusDays(randomIntBetween(1, 10)); - } else { - afterDate = beforeDate.minusDays(randomIntBetween(1, 10)); - } - } - //update counts if date ranges are going to be used. - if(beforeDate != null || afterDate != null ) { //assign amount to be in range - numActivitiesInDateRange = randomIntBetween(0, numActivities); - numberOutOfRange = numActivities - numActivitiesInDateRange; - } - if(beforeDate == null && afterDate != null) { //assign all out of range to be before the start of the range - numBerforeRange = numberOutOfRange; - } else if(beforeDate != null && afterDate == null) { //assign all out of range to be after the start of the range - numAfterRange = numberOutOfRange; - } else if(beforeDate != null && afterDate != null) { //assign half before range and half after the range - numAfterRange = (numberOutOfRange / 2) + (numberOutOfRange % 2); - numBerforeRange = numberOutOfRange / 2; - } - - Plus plus = createMockPlus(numBerforeRange, numAfterRange, numActivitiesInDateRange, afterDate, beforeDate); - BackOffStrategy strategy = new ConstantTimeBackOffStrategy(1); - BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>(); - UserInfo userInfo = new UserInfo(); - userInfo.setUserId("A"); - userInfo.setAfterDate(afterDate); - userInfo.setBeforeDate(beforeDate); - GPlusUserActivityCollector collector = new GPlusUserActivityCollector(plus, datums, strategy, userInfo); - collector.run(); - - assertEquals(numActivitiesInDateRange, datums.size()); - while(!datums.isEmpty()) { - StreamsDatum datum = datums.take(); - assertNotNull(datum); - assertNotNull(datum.getDocument()); - assertTrue(datum.getDocument() instanceof String); - assertTrue(((String)datum.getDocument()).contains(IN_RANGE_IDENTIFIER)); //only in range documents are on the out going queue. - } + if (randomInt() % 2 == 0) { + if (beforeDate == null) { + afterDate = DateTime.now().minusDays(randomIntBetween(1, 10)); + } else { + afterDate = beforeDate.minusDays(randomIntBetween(1, 10)); + } + } + //update counts if date ranges are going to be used. + if (beforeDate != null || afterDate != null ) { //assign amount to be in range + numActivitiesInDateRange = randomIntBetween(0, numActivities); + numberOutOfRange = numActivities - numActivitiesInDateRange; + } + if (beforeDate == null && afterDate != null) { //assign all out of range to be before the start of the range + numBeforeRange = numberOutOfRange; + } else if (beforeDate != null && afterDate == null) { //assign all out of range to be after the start of the range + numAfterRange = numberOutOfRange; + } else if (beforeDate != null && afterDate != null) { //assign half before range and half after the range + numAfterRange = (numberOutOfRange / 2) + (numberOutOfRange % 2); + numBeforeRange = numberOutOfRange / 2; } + Plus plus = createMockPlus(numBeforeRange, numAfterRange, numActivitiesInDateRange, afterDate, beforeDate); + BackOffStrategy strategy = new ConstantTimeBackOffStrategy(1); + BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>(); + UserInfo userInfo = new UserInfo(); + userInfo.setUserId("A"); + userInfo.setAfterDate(afterDate); + userInfo.setBeforeDate(beforeDate); + GPlusUserActivityCollector collector = new GPlusUserActivityCollector(plus, datums, strategy, userInfo); + collector.run(); - private Plus createMockPlus(final int numBefore, final int numAfter, final int numInRange, final DateTime after, final DateTime before) { - Plus plus = mock(Plus.class); - final Plus.Activities activities = createMockPlusActivities(numBefore, numAfter, numInRange, after, before); - doAnswer(new Answer() { - @Override - public Plus.Activities answer(InvocationOnMock invocationOnMock) throws Throwable { - return activities; - } - }).when(plus).activities(); - return plus; + assertEquals(numActivitiesInDateRange, datums.size()); + while (!datums.isEmpty()) { + StreamsDatum datum = datums.take(); + assertNotNull(datum); + assertNotNull(datum.getDocument()); + assertTrue(datum.getDocument() instanceof String); + assertTrue(((String)datum.getDocument()).contains(IN_RANGE_IDENTIFIER)); //only in range documents are on the out going queue. } + } - private Plus.Activities createMockPlusActivities(final int numBefore, final int numAfter, final int numInRange, final DateTime after, final DateTime before) { - Plus.Activities activities = mock(Plus.Activities.class); - try { - Plus.Activities.List list = createMockPlusActivitiesList(numBefore, numAfter, numInRange, after, before); - when(activities.list(anyString(), anyString())).thenReturn(list); - } catch (IOException ioe) { - fail("Should not have thrown exception while creating mock. : "+ioe.getMessage()); - } + + private Plus createMockPlus(final int numBefore, final int numAfter, final int numInRange, final DateTime after, final DateTime before) { + Plus plus = mock(Plus.class); + final Plus.Activities activities = createMockPlusActivities(numBefore, numAfter, numInRange, after, before); + doAnswer(new Answer() { + @Override + public Plus.Activities answer(InvocationOnMock invocationOnMock) throws Throwable { return activities; + } + }).when(plus).activities(); + return plus; + } + + private Plus.Activities createMockPlusActivities( + final int numBefore, + final int numAfter, + final int numInRange, + final DateTime after, + final DateTime before) { + Plus.Activities activities = mock(Plus.Activities.class); + try { + Plus.Activities.List list = createMockPlusActivitiesList(numBefore, numAfter, numInRange, after, before); + when(activities.list(anyString(), anyString())).thenReturn(list); + } catch (IOException ioe) { + fail("Should not have thrown exception while creating mock. : " + ioe.getMessage()); } + return activities; + } - private Plus.Activities.List createMockPlusActivitiesList(final int numBefore, final int numAfter, final int numInRange, final DateTime after, final DateTime before) { - Plus.Activities.List list = mock(Plus.Activities.List.class); - when(list.setMaxResults(anyLong())).thenReturn(list); - when(list.setPageToken(anyString())).thenReturn(list); - ActivityFeedAnswer answer = new ActivityFeedAnswer(numBefore, numAfter, numInRange, after, before); - try { - doAnswer(answer).when(list).execute(); - } catch (IOException ioe) { - fail("Should not have thrown exception while creating mock. : "+ioe.getMessage()); - } - return list; + private Plus.Activities.List createMockPlusActivitiesList( + final int numBefore, + final int numAfter, + final int numInRange, + final DateTime after, + final DateTime before) { + Plus.Activities.List list = mock(Plus.Activities.List.class); + when(list.setMaxResults(anyLong())).thenReturn(list); + when(list.setPageToken(anyString())).thenReturn(list); + ActivityFeedAnswer answer = new ActivityFeedAnswer(numBefore, numAfter, numInRange, after, before); + try { + doAnswer(answer).when(list).execute(); + } catch (IOException ioe) { + fail("Should not have thrown exception while creating mock. : " + ioe.getMessage()); } + return list; + } - private static ActivityFeed createMockActivityFeed(int numBefore, int numAfter, int numInRange, DateTime after, DateTime before, boolean page) { - ActivityFeed feed = new ActivityFeed(); - List<Activity> list = Lists.newLinkedList(); - for(int i=0; i < numAfter; ++i) { - DateTime published = before.plus(randomIntBetween(0, Integer.MAX_VALUE)); - Activity activity = createActivityWithPublishedDate(published); - list.add(activity); - } - for(int i=0; i < numInRange; ++i) { - DateTime published = null; - if((before == null && after == null) || before == null) { - published = DateTime.now(); // no date range or end time date range so just make the time now. - } else if(after == null) { - published = before.minusMillis(randomIntBetween(1, Integer.MAX_VALUE)); //no beginning to range - } else { // has to be in range - long range = before.getMillis() - after.getMillis(); - published = after.plus(range / 2); //in the middle - } - Activity activity = createActivityWithPublishedDate(published); - activity.setTitle(IN_RANGE_IDENTIFIER); - list.add(activity); - } - for(int i=0; i < numBefore; ++i) { - DateTime published = after.minusMillis(randomIntBetween(1, Integer.MAX_VALUE)); - Activity activity = createActivityWithPublishedDate(published); - list.add(activity); - } - if(page) { - feed.setNextPageToken("A"); - } else { - feed.setNextPageToken(null); - } - feed.setItems(list); - return feed; + private static ActivityFeed createMockActivityFeed( + int numBefore, + int numAfter, + int numInRange, + DateTime after, + DateTime before, + boolean page) { + ActivityFeed feed = new ActivityFeed(); + List<Activity> list = Lists.newLinkedList(); + for (int i = 0; i < numAfter; ++i) { + DateTime published = before.plus(randomIntBetween(0, Integer.MAX_VALUE)); + Activity activity = createActivityWithPublishedDate(published); + list.add(activity); } - - private static Activity createActivityWithPublishedDate(DateTime dateTime) { - Activity activity = new Activity(); - activity.setPublished(new com.google.api.client.util.DateTime(dateTime.getMillis())); - activity.setId("a"); - return activity; + for (int i = 0; i < numInRange; ++i) { + DateTime published = null; + if ((before == null && after == null) || before == null) { + published = DateTime.now(); // no date range or end time date range so just make the time now. + } else if (after == null) { + published = before.minusMillis(randomIntBetween(1, Integer.MAX_VALUE)); //no beginning to range + } else { // has to be in range + long range = before.getMillis() - after.getMillis(); + published = after.plus(range / 2); //in the middle + } + Activity activity = createActivityWithPublishedDate(published); + activity.setTitle(IN_RANGE_IDENTIFIER); + list.add(activity); } + for (int i = 0; i < numBefore; ++i) { + DateTime published = after.minusMillis(randomIntBetween(1, Integer.MAX_VALUE)); + Activity activity = createActivityWithPublishedDate(published); + list.add(activity); + } + if (page) { + feed.setNextPageToken("A"); + } else { + feed.setNextPageToken(null); + } + feed.setItems(list); + return feed; + } - private static class ActivityFeedAnswer implements Answer<ActivityFeed> { - private int afterCount = 0; - private int beforeCount = 0; - private int inCount = 0; - private int maxBatch = 100; + private static Activity createActivityWithPublishedDate(DateTime dateTime) { + Activity activity = new Activity(); + activity.setPublished(new com.google.api.client.util.DateTime(dateTime.getMillis())); + activity.setId("a"); + return activity; + } - private int numAfter; - private int numInRange; - private int numBefore; - private DateTime after; - private DateTime before; + private static class ActivityFeedAnswer implements Answer<ActivityFeed> { + private int afterCount = 0; + private int beforeCount = 0; + private int inCount = 0; + private int maxBatch = 100; - private ActivityFeedAnswer(int numBefore, int numAfter, int numInRange, DateTime after, DateTime before) { - this.numBefore = numBefore; - this.numAfter = numAfter; - this.numInRange = numInRange; - this.after = after; - this.before = before; - } + private int numAfter; + private int numInRange; + private int numBefore; + private DateTime after; + private DateTime before; + private ActivityFeedAnswer(int numBefore, int numAfter, int numInRange, DateTime after, DateTime before) { + this.numBefore = numBefore; + this.numAfter = numAfter; + this.numInRange = numInRange; + this.after = after; + this.before = before; + } - @Override - public ActivityFeed answer(InvocationOnMock invocationOnMock) throws Throwable { - int totalCount = 0; - int batchAfter = 0; - int batchBefore = 0; - int batchIn = 0; - if(afterCount != numAfter) { - if(numAfter - afterCount >= maxBatch) { - afterCount += maxBatch; - batchAfter += maxBatch; - totalCount += batchAfter; - } else { - batchAfter += numAfter - afterCount; - totalCount += numAfter - afterCount; - afterCount = numAfter; - } - } - if(totalCount < maxBatch && inCount != numInRange) { - if(numInRange - inCount >= maxBatch - totalCount) { - inCount += maxBatch - totalCount; - batchIn += maxBatch - totalCount; - totalCount += batchIn; - } else { - batchIn += numInRange - inCount; - totalCount += numInRange - inCount; - inCount = numInRange; - } - } - if(totalCount < maxBatch && beforeCount != numBefore) { - if(numBefore - batchBefore >= maxBatch - totalCount) { - batchBefore += maxBatch - totalCount; - totalCount = maxBatch; - beforeCount +=batchBefore; - } else { - batchBefore += numBefore - beforeCount; - totalCount += numBefore - beforeCount; - beforeCount = numBefore; - } - } - return createMockActivityFeed(batchBefore, batchAfter, batchIn, after, before, numAfter != afterCount || inCount != numInRange || beforeCount != numBefore); + @Override + public ActivityFeed answer(InvocationOnMock invocationOnMock) throws Throwable { + int totalCount = 0; + int batchAfter = 0; + int batchBefore = 0; + int batchIn = 0; + if (afterCount != numAfter) { + if (numAfter - afterCount >= maxBatch) { + afterCount += maxBatch; + batchAfter += maxBatch; + totalCount += batchAfter; + } else { + batchAfter += numAfter - afterCount; + totalCount += numAfter - afterCount; + afterCount = numAfter; } + } + if (totalCount < maxBatch && inCount != numInRange) { + if (numInRange - inCount >= maxBatch - totalCount) { + inCount += maxBatch - totalCount; + batchIn += maxBatch - totalCount; + totalCount += batchIn; + } else { + batchIn += numInRange - inCount; + totalCount += numInRange - inCount; + inCount = numInRange; + } + } + if (totalCount < maxBatch && beforeCount != numBefore) { + if (numBefore - batchBefore >= maxBatch - totalCount) { + batchBefore += maxBatch - totalCount; + totalCount = maxBatch; + beforeCount += batchBefore; + } else { + batchBefore += numBefore - beforeCount; + totalCount += numBefore - beforeCount; + beforeCount = numBefore; + } + } + + return createMockActivityFeed( + batchBefore, + batchAfter, + batchIn, + after, + before, + numAfter != afterCount || inCount != numInRange || beforeCount != numBefore); } + }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java index 1251b9a..4460fb1 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java @@ -19,13 +19,15 @@ package com.google.gplus.provider; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.services.plus.Plus; -import com.google.api.services.plus.model.Person; import org.apache.streams.core.StreamsDatum; import org.apache.streams.google.gplus.configuration.UserInfo; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; import org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.services.plus.Plus; +import com.google.api.services.plus.model.Person; + import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -40,110 +42,106 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** - * Basic Units for {@link com.google.gplus.provider.GPlusUserDataCollector} + * Basic Units for {@link com.google.gplus.provider.GPlusUserDataCollector}. */ public class TestGPlusUserDataCollector { - private static final String NO_ERROR = "no error"; - - - /** - * Test that on success a datum will be added to the queue. - * @throws Exception - */ - @Test - public void testSucessfullPull() throws Exception { - Plus plus = createMockPlus(0, null); - BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1); - BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>(); - UserInfo user = new UserInfo(); - user.setUserId("A"); - - GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, backOff, datums, user); - collector.run(); - - assertEquals(1, datums.size()); - StreamsDatum datum = datums.take(); - assertNotNull(datum); - assertEquals(NO_ERROR, datum.getId()); - assertNotNull(datum.getDocument()); - assertTrue(datum.getDocument() instanceof String); - } - - /** - * Test that on failure, no datums are output - * @throws Exception - */ - @Test - public void testFail() throws Exception { - Plus plus = createMockPlus(3, mock(GoogleJsonResponseException.class)); - UserInfo user = new UserInfo(); - user.setUserId("A"); - BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>(); - BackOffStrategy backOffStrategy = new ConstantTimeBackOffStrategy(1); - - GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, backOffStrategy, datums, user); - collector.run(); - - assertEquals(0, datums.size()); - } - - - - private Plus createMockPlus(final int succedOnTry, final Throwable throwable) { - Plus plus = mock(Plus.class); - doAnswer(new Answer() { - @Override - public Plus.People answer(InvocationOnMock invocationOnMock) throws Throwable { - return createMockPeople(succedOnTry, throwable); - } - }).when(plus).people(); - return plus; - } - - private Plus.People createMockPeople(final int succedOnTry, final Throwable throwable) { - Plus.People people = mock(Plus.People.class); - try { - when(people.get(anyString())).thenAnswer(new Answer<Plus.People.Get>() { - @Override - public Plus.People.Get answer(InvocationOnMock invocationOnMock) throws Throwable { - return createMockGetNoError(succedOnTry, throwable); - } - }); - } catch (IOException ioe) { - fail("No Excpetion should have been thrown while creating mocks"); + private static final String NO_ERROR = "no error"; + + /** + * Test that on success a datum will be added to the queue. + * @throws Exception Exception + */ + @Test + public void testSucessfullPull() throws Exception { + Plus plus = createMockPlus(0, null); + BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1); + BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>(); + UserInfo user = new UserInfo(); + user.setUserId("A"); + + GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, backOff, datums, user); + collector.run(); + + assertEquals(1, datums.size()); + StreamsDatum datum = datums.take(); + assertNotNull(datum); + assertEquals(NO_ERROR, datum.getId()); + assertNotNull(datum.getDocument()); + assertTrue(datum.getDocument() instanceof String); + } + + /** + * Test that on failure, no datums are output. + * @throws Exception Exception + */ + @Test + public void testFail() throws Exception { + Plus plus = createMockPlus(3, mock(GoogleJsonResponseException.class)); + UserInfo user = new UserInfo(); + user.setUserId("A"); + BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>(); + BackOffStrategy backOffStrategy = new ConstantTimeBackOffStrategy(1); + + GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, backOffStrategy, datums, user); + collector.run(); + + assertEquals(0, datums.size()); + } + + private Plus createMockPlus(final int succedOnTry, final Throwable throwable) { + Plus plus = mock(Plus.class); + doAnswer(new Answer() { + @Override + public Plus.People answer(InvocationOnMock invocationOnMock) throws Throwable { + return createMockPeople(succedOnTry, throwable); + } + }).when(plus).people(); + return plus; + } + + private Plus.People createMockPeople(final int succedOnTry, final Throwable throwable) { + Plus.People people = mock(Plus.People.class); + try { + when(people.get(anyString())).thenAnswer(new Answer<Plus.People.Get>() { + @Override + public Plus.People.Get answer(InvocationOnMock invocationOnMock) throws Throwable { + return createMockGetNoError(succedOnTry, throwable); } - return people; + }); + } catch (IOException ioe) { + fail("No Excpetion should have been thrown while creating mocks"); } - - private Plus.People.Get createMockGetNoError(final int succedOnTry, final Throwable throwable) { - Plus.People.Get get = mock(Plus.People.Get.class); - try { - doAnswer(new Answer() { - private int counter =0; - - @Override - public Person answer(InvocationOnMock invocationOnMock) throws Throwable { - if(counter == succedOnTry) { - Person p = new Person(); - p.setId(NO_ERROR); - return p; - } else { - ++counter; - throw throwable; - } - } - }).when(get).execute(); - } catch (IOException ioe) { - fail("No Excpetion should have been thrown while creating mocks"); + return people; + } + + private Plus.People.Get createMockGetNoError(final int succedOnTry, final Throwable throwable) { + Plus.People.Get get = mock(Plus.People.Get.class); + try { + doAnswer(new Answer() { + private int counter = 0; + + @Override + public Person answer(InvocationOnMock invocationOnMock) throws Throwable { + if (counter == succedOnTry) { + Person person = new Person(); + person.setId(NO_ERROR); + return person; + } else { + ++counter; + throw throwable; + } } - return get; + }).when(get).execute(); + } catch (IOException ioe) { + fail("No Excpetion should have been thrown while creating mocks"); } + return get; + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java index 8b4c29b..96a9d89 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java @@ -18,53 +18,58 @@ package com.google.gplus.serializer.util; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.api.services.plus.model.Activity; import com.google.api.services.plus.model.Person; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.junit.Test; import static org.junit.Assert.assertEquals; +/** + * GPlusEventClassifierTest tests GPlusEventClassifier. + */ public class GPlusEventClassifierTest { - private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance(); - @Test - public void classifyActivityTest() { - try { - Activity activity = new Activity(); - activity.setKind("plus#activity"); - Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(activity)); + private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance(); + + @Test + public void classifyActivityTest() { + try { + Activity activity = new Activity(); + activity.setKind("plus#activity"); + Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(activity)); - assertEquals(retClass, Activity.class); - } catch(Exception e) { - // - } + assertEquals(retClass, Activity.class); + } catch (Exception ex) { + // } + } - @Test - public void classifyPersonTest() { - try { - Person person = new Person(); - person.setKind("plus#person"); - Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person)); + @Test + public void classifyPersonTest() { + try { + Person person = new Person(); + person.setKind("plus#person"); + Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person)); - assertEquals(retClass, Person.class); - } catch(Exception e) { - // - } + assertEquals(retClass, Person.class); + } catch (Exception ex) { + // } + } - @Test - public void classifObjectNodeTest() { - try { - Person person = new Person(); - person.setKind("fake"); - Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person)); + @Test + public void classifyObjectNodeTest() { + try { + Person person = new Person(); + person.setKind("fake"); + Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person)); - assertEquals(retClass, ObjectNode.class); - } catch(Exception e) { - // - } + assertEquals(retClass, ObjectNode.class); + } catch (Exception ex) { + // } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java index 51caa64..4b642ab 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java @@ -29,39 +29,39 @@ import java.io.LineNumberReader; public class GPlusUserActivityProviderIT { - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserActivityProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserActivityProviderIT.class); - @Test - public void testGPlusUserActivityProvider() throws Exception { + @Test + public void testGPlusUserActivityProvider() throws Exception { - String configfile = "./target/test-classes/GPlusUserActivityProviderIT.conf"; - String outfile = "./target/test-classes/GPlusUserActivityProviderIT.stdout.txt"; + String configfile = "./target/test-classes/GPlusUserActivityProviderIT.conf"; + String outfile = "./target/test-classes/GPlusUserActivityProviderIT.stdout.txt"; - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; + String[] args = new String[2]; + args[0] = configfile; + args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { - try { - GPlusUserActivityProvider.main(args); - } catch( Exception e ) { - LOGGER.error("Test Exception!", e); - } - }); - testThread.start(); - testThread.join(60000); + Thread testThread = new Thread((Runnable) () -> { + try { + GPlusUserActivityProvider.main(args); + } catch ( Exception ex ) { + LOGGER.error("Test Exception!", ex); + } + }); + testThread.start(); + testThread.join(60000); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() >= 1); + assert (outCounter.getLineNumber() >= 1); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java index b367baa..fd4ddd5 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java @@ -30,41 +30,41 @@ import java.io.LineNumberReader; public class GPlusUserDataProviderIT { - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserDataProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserDataProviderIT.class); - @Test - public void testGPlusUserDataProvider() throws Exception { + @Test + public void testGPlusUserDataProvider() throws Exception { - String configfile = "./target/test-classes/GPlusUserDataProviderIT.conf"; - String outfile = "./target/test-classes/GPlusUserDataProviderIT.stdout.txt"; + String configfile = "./target/test-classes/GPlusUserDataProviderIT.conf"; + String outfile = "./target/test-classes/GPlusUserDataProviderIT.stdout.txt"; - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; + String[] args = new String[2]; + args[0] = configfile; + args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { - try { - GPlusUserDataProvider.main(args); - } catch( Exception e ) { - LOGGER.error("Test Exception!", e); - } - }); - testThread.start(); - testThread.join(60000); + Thread testThread = new Thread((Runnable) () -> { + try { + GPlusUserDataProvider.main(args); + } catch ( Exception ex ) { + LOGGER.error("Test Exception!", ex); + } + }); + testThread.start(); + testThread.join(60000); - GPlusUserDataProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + GPlusUserDataProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() >= 1); + assert (outCounter.getLineNumber() >= 1); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java index 17af5f6..6fd6b4e 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java @@ -18,88 +18,91 @@ package org.apache.streams.instagram.processor; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.instagram.serializer.InstagramMediaFeedDataConverter; import org.apache.streams.instagram.serializer.InstagramUserInfoDataConverter; -import org.apache.streams.instagram.serializer.util.InstagramActivityUtil; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; + +import com.google.common.collect.Lists; import org.jinstagram.entity.users.basicinfo.UserInfoData; import org.jinstagram.entity.users.feed.MediaFeedData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.Queue; /** - * This is deprecated - use ActivityConverterProcessor or ActivityObjectConverterProcessor + * This is deprecated - use ActivityConverterProcessor or ActivityObjectConverterProcessor. */ @Deprecated public class InstagramTypeConverter implements StreamsProcessor { - public final static String STREAMS_ID = "InstagramTypeConverter"; + public static final String STREAMS_ID = "InstagramTypeConverter"; - private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTypeConverter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(InstagramTypeConverter.class); - private InstagramMediaFeedDataConverter mediaFeedDataConverter; - private InstagramUserInfoDataConverter userInfoDataConverter; + private InstagramMediaFeedDataConverter mediaFeedDataConverter; + private InstagramUserInfoDataConverter userInfoDataConverter; - public final static String TERMINATE = new String("TERMINATE"); - - @Override - public String getId() { - return STREAMS_ID; - } + public static final String TERMINATE = new String("TERMINATE"); - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + @Override + public String getId() { + return STREAMS_ID; + } - StreamsDatum result = null; + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - try { - Object item = entry.getDocument(); + StreamsDatum result = null; - LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - if(item instanceof MediaFeedData) { + try { + Object item = entry.getDocument(); - //We don't need to use the mapper, since we have a process to convert between - //MediaFeedData objects and Activity objects already - List<Activity> activity = mediaFeedDataConverter.toActivityList((MediaFeedData)item); + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); + if (item instanceof MediaFeedData) { - if( activity.size() > 0 ) result = new StreamsDatum(activity); + //We don't need to use the mapper, since we have a process to convert between + //MediaFeedData objects and Activity objects already + List<Activity> activity = mediaFeedDataConverter.toActivityList((MediaFeedData)item); - } else if(item instanceof UserInfoData) { - - ActivityObject activityObject = userInfoDataConverter.toActivityObject((UserInfoData)item); + if ( activity.size() > 0 ) { + result = new StreamsDatum(activity); + } - if( activityObject != null ) result = new StreamsDatum(activityObject); + } else if (item instanceof UserInfoData) { - } + ActivityObject activityObject = userInfoDataConverter.toActivityObject((UserInfoData)item); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.error("Exception while converting item: {}", e.getMessage()); + if ( activityObject != null ) { + result = new StreamsDatum(activityObject); } - if( result != null ) { - return Lists.newArrayList(result); - } else - return Lists.newArrayList(); - - } + } - @Override - public void prepare(Object o) { - mediaFeedDataConverter = new InstagramMediaFeedDataConverter(); - userInfoDataConverter = new InstagramUserInfoDataConverter(); + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.error("Exception while converting item: {}", ex.getMessage()); } - @Override - public void cleanUp() { - //noop + if ( result != null ) { + return Lists.newArrayList(result); + } else { + return Lists.newArrayList(); } + } + + @Override + public void prepare(Object configurationObject) { + mediaFeedDataConverter = new InstagramMediaFeedDataConverter(); + userInfoDataConverter = new InstagramUserInfoDataConverter(); + } + + @Override + public void cleanUp() { + //noop + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java index 0c7ba95..025af18 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java @@ -12,12 +12,9 @@ software distributed under the License is distributed on an KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package org.apache.streams.instagram.provider; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; @@ -28,6 +25,11 @@ import org.apache.streams.instagram.User; import org.apache.streams.instagram.UsersInfo; import org.apache.streams.util.ComponentUtils; import org.apache.streams.util.SerializationUtil; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,178 +55,172 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public abstract class InstagramAbstractProvider implements StreamsProvider { - public static final String STREAMS_ID = "InstagramAbstractProvider"; - - private static final Logger LOGGER = LoggerFactory.getLogger(InstagramAbstractProvider.class); - - private static final int MAX_BATCH_SIZE = 2000; - - protected InstagramConfiguration config; - protected Queue<StreamsDatum> dataQueue; - private ListeningExecutorService executorService; - - private List<ListenableFuture<Object>> futures = new ArrayList<>(); - - private AtomicBoolean isCompleted; - - public InstagramAbstractProvider() { - this.config = new ComponentConfigurator<>(InstagramConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("instagram")); - } - - public InstagramAbstractProvider(InstagramConfiguration config) { - this.config = SerializationUtil.cloneBySerialization(config); - } - - public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public void startStream() { - InstagramDataCollector dataCollector = getInstagramDataCollector(); - this.executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture future = this.executorService.submit(dataCollector); - this.futures.add(future); - executorService.shutdown(); - } - - /** - * Return the data collector to use to connect to instagram. - * @return {@link InstagramDataCollector} - */ - protected abstract InstagramDataCollector getInstagramDataCollector(); - - - @Override - public StreamsResultSet readCurrent() { - Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>(); - int count = 0; - while(!this.dataQueue.isEmpty() && count < MAX_BATCH_SIZE) { - ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue), batch); - ++count; - } - return new StreamsResultSet(batch); - } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } - - @Override - public void prepare(Object configurationObject) { - this.dataQueue = new ConcurrentLinkedQueue<>(); - this.isCompleted = new AtomicBoolean(false); - } - - @Override - public void cleanUp() { - try { - ComponentUtils.shutdownExecutor(this.executorService, 5, 5); - } finally { - this.executorService = null; - } - } - - /** - * Add default start and stop points if necessary. - */ - private void updateUserInfoList() { - UsersInfo usersInfo = this.config.getUsersInfo(); - if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) { - return; - } - DateTime defaultAfterDate = usersInfo.getDefaultAfterDate(); - DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate(); - for(User user : usersInfo.getUsers()) { - if(defaultAfterDate != null && user.getAfterDate() == null) { - user.setAfterDate(defaultAfterDate); - } - if(defaultBeforeDate != null && user.getBeforeDate() == null) { - user.setBeforeDate(defaultBeforeDate); - } - } - } - - /** - * Overrides the client id in the configuration. - * @param clientId client id to use - */ - public void setInstagramClientId(String clientId) { - this.config.setClientId(clientId); - } - - /** - * Overrides authroized user tokens in the configuration. - * @param tokenStrings - */ - public void setAuthorizedUserTokens(Collection<String> tokenStrings) { - ensureUsersInfo(this.config).setAuthorizedTokens(new HashSet<>(tokenStrings)); - } - - /** - * Overrides the default before date in the configuration - * @param beforeDate - */ - public void setDefaultBeforeDate(DateTime beforeDate) { - ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate); - } - - /** - * Overrides the default after date in the configuration - * @param afterDate - */ - public void setDefaultAfterDate(DateTime afterDate) { - ensureUsersInfo(this.config).setDefaultAfterDate(afterDate); - } - - /** - * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies - * pull data from as early as possible. If default before or after DateTimes are set, they will applied to all - * NULL DateTimes. - * @param usersWithAfterDate instagram user id mapped to BeforeDate time - */ - public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) { - Set<User> users = new HashSet<>(); - for(String userId : usersWithAfterDate.keySet()) { - User user = new User(); - user.setUserId(userId); - user.setAfterDate(usersWithAfterDate.get(userId)); - users.add(user); - } - ensureUsersInfo(this.config).setUsers(users); - } - - private UsersInfo ensureUsersInfo(InstagramConfiguration config) { - UsersInfo usersInfo = config.getUsersInfo(); - if(usersInfo == null) { - usersInfo = new UsersInfo(); - config.setUsersInfo(usersInfo); - } - return usersInfo; - } - - @Override - public boolean isRunning() { - if (dataQueue.isEmpty() && executorService.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); - isCompleted.set(true); - LOGGER.info("Exiting"); - } - return !isCompleted.get(); - } + public static final String STREAMS_ID = "InstagramAbstractProvider"; + + private static final Logger LOGGER = LoggerFactory.getLogger(InstagramAbstractProvider.class); + + private static final int MAX_BATCH_SIZE = 2000; + + protected InstagramConfiguration config; + protected Queue<StreamsDatum> dataQueue; + private ListeningExecutorService executorService; + + private List<ListenableFuture<Object>> futures = new ArrayList<>(); + + private AtomicBoolean isCompleted; + + public InstagramAbstractProvider() { + this.config = new ComponentConfigurator<>(InstagramConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("instagram")); + } + + public InstagramAbstractProvider(InstagramConfiguration config) { + this.config = SerializationUtil.cloneBySerialization(config); + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void startStream() { + InstagramDataCollector dataCollector = getInstagramDataCollector(); + this.executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = this.executorService.submit(dataCollector); + this.futures.add(future); + executorService.shutdown(); + } + + /** + * Return the data collector to use to connect to instagram. + * @return {@link InstagramDataCollector} + */ + protected abstract InstagramDataCollector getInstagramDataCollector(); + + + @Override + public StreamsResultSet readCurrent() { + Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>(); + int count = 0; + while (!this.dataQueue.isEmpty() && count < MAX_BATCH_SIZE) { + ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue), batch); + ++count; + } + return new StreamsResultSet(batch); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public void prepare(Object configurationObject) { + this.dataQueue = new ConcurrentLinkedQueue<>(); + this.isCompleted = new AtomicBoolean(false); + } + + @Override + public void cleanUp() { + try { + ComponentUtils.shutdownExecutor(this.executorService, 5, 5); + } finally { + this.executorService = null; + } + } + + /** + * Add default start and stop points if necessary. + */ + private void updateUserInfoList() { + UsersInfo usersInfo = this.config.getUsersInfo(); + if (usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) { + return; + } + DateTime defaultAfterDate = usersInfo.getDefaultAfterDate(); + DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate(); + for (User user : usersInfo.getUsers()) { + if (defaultAfterDate != null && user.getAfterDate() == null) { + user.setAfterDate(defaultAfterDate); + } + if (defaultBeforeDate != null && user.getBeforeDate() == null) { + user.setBeforeDate(defaultBeforeDate); + } + } + } + + /** + * Overrides the client id in the configuration. + * @param clientId client id to use + */ + public void setInstagramClientId(String clientId) { + this.config.setClientId(clientId); + } + + /** + * Overrides authroized user tokens in the configuration. + * @param tokenStrings tokenStrings + */ + public void setAuthorizedUserTokens(Collection<String> tokenStrings) { + ensureUsersInfo(this.config).setAuthorizedTokens(new HashSet<>(tokenStrings)); + } + + /** + * Overrides the default before date in the configuration. + * @param beforeDate beforeDate + */ + public void setDefaultBeforeDate(DateTime beforeDate) { + ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate); + } + + /** + * Overrides the default after date in the configuration. + * @param afterDate afterDate + */ + public void setDefaultAfterDate(DateTime afterDate) { + ensureUsersInfo(this.config).setDefaultAfterDate(afterDate); + } + + /** + * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies + * pull data from as early as possible. If default before or after DateTimes are set, they will applied to all + * NULL DateTimes. + * @param usersWithAfterDate instagram user id mapped to BeforeDate time + */ + public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) { + Set<User> users = new HashSet<>(); + for (String userId : usersWithAfterDate.keySet()) { + User user = new User(); + user.setUserId(userId); + user.setAfterDate(usersWithAfterDate.get(userId)); + users.add(user); + } + ensureUsersInfo(this.config).setUsers(users); + } + + private UsersInfo ensureUsersInfo(InstagramConfiguration config) { + UsersInfo usersInfo = config.getUsersInfo(); + if (usersInfo == null) { + usersInfo = new UsersInfo(); + config.setUsersInfo(usersInfo); + } + return usersInfo; + } + + @Override + public boolean isRunning() { + if (dataQueue.isEmpty() && executorService.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + isCompleted.set(true); + LOGGER.info("Exiting"); + } + return !isCompleted.get(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java index 97451f0..1916061 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java @@ -12,9 +12,9 @@ software distributed under the License is distributed on an KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package org.apache.streams.instagram.provider; -import com.google.common.annotations.VisibleForTesting; import org.apache.streams.core.StreamsDatum; import org.apache.streams.instagram.InstagramConfiguration; import org.apache.streams.instagram.User; @@ -22,13 +22,9 @@ import org.apache.streams.util.ComponentUtils; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager; -import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger; +import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager; + import org.jinstagram.Instagram; -import org.jinstagram.entity.common.Pagination; -import org.jinstagram.entity.users.feed.MediaFeed; -import org.jinstagram.entity.users.feed.MediaFeedData; -import org.jinstagram.exceptions.InstagramBadRequestException; -import org.jinstagram.exceptions.InstagramRateLimitException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,102 +40,106 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public abstract class InstagramDataCollector<T> implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(InstagramDataCollector.class); - - protected Queue<StreamsDatum> dataQueue; //exposed for testing - private InstagramConfiguration config; - private AtomicBoolean isCompleted; - private SimpleTokenManager<InstagramOauthToken> tokenManger; - protected int consecutiveErrorCount; - protected BackOffStrategy backOffStrategy; - private Instagram instagram; - - - public InstagramDataCollector(Queue<StreamsDatum> queue, InstagramConfiguration config) { - this.dataQueue = queue; - this.config = config; - this.isCompleted = new AtomicBoolean(false); - this.tokenManger = new BasicTokenManger<InstagramOauthToken>(); - for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) { - this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens)); - } - this.consecutiveErrorCount = 0; - this.backOffStrategy = new ExponentialBackOffStrategy(2); - this.instagram = new Instagram(this.config.getClientId()); + private static final Logger LOGGER = LoggerFactory.getLogger(InstagramDataCollector.class); + + protected Queue<StreamsDatum> dataQueue; //exposed for testing + private InstagramConfiguration config; + private AtomicBoolean isCompleted; + private SimpleTokenManager<InstagramOauthToken> tokenManger; + protected int consecutiveErrorCount; + protected BackOffStrategy backOffStrategy; + private Instagram instagram; + + /** + * InstagramDataCollector constructor. + * @param queue Queue of StreamsDatum + * @param config InstagramConfiguration + */ + public InstagramDataCollector(Queue<StreamsDatum> queue, InstagramConfiguration config) { + this.dataQueue = queue; + this.config = config; + this.isCompleted = new AtomicBoolean(false); + this.tokenManger = new BasicTokenManager<InstagramOauthToken>(); + for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) { + this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens)); } - - - /** - * If there are authorized tokens available, it sets a new token for the client and returns - * the client. If there are no available tokens, it simply returns the client that was - * initialized in the constructor with client id. - * @return - */ - protected Instagram getNextInstagramClient() { - if(this.tokenManger.numAvailableTokens() > 0) { - this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken()); - } - return this.instagram; + this.consecutiveErrorCount = 0; + this.backOffStrategy = new ExponentialBackOffStrategy(2); + this.instagram = new Instagram(this.config.getClientId()); + } + + + /** + * If there are authorized tokens available, it sets a new token for the client and returns + * the client. If there are no available tokens, it simply returns the client that was + * initialized in the constructor with client id. + * @return result + */ + protected Instagram getNextInstagramClient() { + if (this.tokenManger.numAvailableTokens() > 0) { + this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken()); } - - /** - * Return the number of available tokens for this data collector - * @return numbeer of available tokens - */ - protected int numAvailableTokens() { - return this.tokenManger.numAvailableTokens(); + return this.instagram; + } + + /** + * Return the number of available tokens for this data collector. + * @return numbeer of available tokens + */ + protected int numAvailableTokens() { + return this.tokenManger.numAvailableTokens(); + } + + /** + * Queues the Instagram data to be output by the provider. + * @param userData data to queue + * @param userId user id who the data came from + */ + protected void queueData(Collection<T> userData, String userId) { + if (userData == null) { + LOGGER.warn("User id, {}, returned a NULL data from instagram.", userId); + } else { + for (T data : userData) { + ComponentUtils.offerUntilSuccess(convertToStreamsDatum(data), this.dataQueue); + } } - - /** - * Queues the Instagram data to be output by the provider. - * @param userData data to queue - * @param userId user id who the data came from - */ - protected void queueData(Collection<T> userData, String userId) { - if (userData == null) { - LOGGER.warn("User id, {}, returned a NULL data from instagram.", userId); - } else { - for (T data : userData) { - ComponentUtils.offerUntilSuccess(convertToStreamsDatum(data), this.dataQueue); - } - } + } + + /** + * @return true when the collector has queued all of the available Instagram data for the provided users. + */ + public boolean isCompleted() { + return this.isCompleted.get(); + } + + @Override + public void run() { + for (User user : this.config.getUsersInfo().getUsers()) { + try { + collectInstagramDataForUser(user); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (Exception ex) { + LOGGER.error("Exception thrown while polling for user, {}, skipping user.", user.getUserId()); + LOGGER.error("Exception thrown while polling for user : ", ex); + } } - - /** - * @return true when the collector has queued all of the available Instagram data for the provided users. - */ - public boolean isCompleted() { - return this.isCompleted.get(); - } - - @Override - public void run() { - for (User user : this.config.getUsersInfo().getUsers()) { - try { - collectInstagramDataForUser(user); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOGGER.error("Exception thrown while polling for user, {}, skipping user.", user.getUserId()); - LOGGER.error("Exception thrown while polling for user : ", e); - } - } - this.isCompleted.set(true); - } - - /** - * Pull instagram data for a user and queues the resulting data. - * @param user - * @throws Exception - */ - protected abstract void collectInstagramDataForUser(User user) throws Exception; - - /** - * Takes an Instagram Object and sets it as the document of a streams datum and sets the id of the streams datum. - * @param item - * @return - */ - protected abstract StreamsDatum convertToStreamsDatum(T item); + this.isCompleted.set(true); + } + + /** + * Pull instagram data for a user and queues the resulting data. + * @param user + * @throws Exception + */ + protected abstract void collectInstagramDataForUser(User user) throws Exception; + + /** + * Takes an Instagram Object and sets it as the document of a streams datum and sets the id of the streams datum. + * @param item + * @return + */ + protected abstract StreamsDatum convertToStreamsDatum(T item); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java index 4531cfe..959b240 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java @@ -12,8 +12,8 @@ software distributed under the License is distributed on an KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package org.apache.streams.instagram.provider; +package org.apache.streams.instagram.provider; import org.jinstagram.auth.model.Token; @@ -23,21 +23,21 @@ import org.jinstagram.auth.model.Token; */ public class InstagramOauthToken extends Token { - public InstagramOauthToken(String token) { - this(token, null); - } + public InstagramOauthToken(String token) { + this(token, null); + } - public InstagramOauthToken(String token, String secret) { - super(token, secret); - } + public InstagramOauthToken(String token, String secret) { + super(token, secret); + } - @Override - public boolean equals(Object o) { - if(!(o instanceof InstagramOauthToken)) { - return false; - } - InstagramOauthToken that = (InstagramOauthToken) o; - return this.getToken().equals(that.getToken()); + @Override + public boolean equals(Object object) { + if (!(object instanceof InstagramOauthToken)) { + return false; } + InstagramOauthToken that = (InstagramOauthToken) object; + return this.getToken().equals(that.getToken()); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java index e946e6b..b1e4593 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java @@ -12,13 +12,14 @@ software distributed under the License is distributed on an KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package org.apache.streams.instagram.provider.recentmedia; -import com.google.common.annotations.VisibleForTesting; import org.apache.streams.core.StreamsDatum; import org.apache.streams.instagram.InstagramConfiguration; import org.apache.streams.instagram.User; import org.apache.streams.instagram.provider.InstagramDataCollector; + import org.jinstagram.entity.common.Pagination; import org.jinstagram.entity.users.feed.MediaFeed; import org.jinstagram.entity.users.feed.MediaFeedData; @@ -37,75 +38,77 @@ import java.util.Queue; */ public class InstagramRecentMediaCollector extends InstagramDataCollector<MediaFeedData> { - private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class); - protected static final int MAX_ATTEMPTS = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class); + protected static final int MAX_ATTEMPTS = 5; - private int consecutiveErrorCount; + private int consecutiveErrorCount; - public InstagramRecentMediaCollector(Queue<StreamsDatum> queue, InstagramConfiguration config) { - super(queue, config); - } + public InstagramRecentMediaCollector(Queue<StreamsDatum> queue, InstagramConfiguration config) { + super(queue, config); + } - @Override - protected StreamsDatum convertToStreamsDatum(MediaFeedData item) { - return new StreamsDatum(item, item.getId()); - } + @Override + protected StreamsDatum convertToStreamsDatum(MediaFeedData item) { + return new StreamsDatum(item, item.getId()); + } - /** - * Pull Recement Media for a user and queues the resulting data. Will try a single call 5 times before failing and - * moving on to the next call or returning. - * @param user - * @throws Exception - */ - @Override - protected void collectInstagramDataForUser(User user) throws Exception { - Pagination pagination = null; - do { - int attempts = 0; - boolean succesfullDataPull = false; - while (!succesfullDataPull && attempts < MAX_ATTEMPTS) { - ++attempts; - MediaFeed feed = null; - try { - if (pagination == null) { - feed = getNextInstagramClient().getRecentMediaFeed(user.getUserId(), - 0, - null, - null, - user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(), - user.getAfterDate() == null ? null : user.getAfterDate().toDate()); - } else { - feed = getNextInstagramClient().getRecentMediaNextPage(pagination); - } - } catch (Exception e) { - if(e instanceof InstagramRateLimitException) { - LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e); - this.backOffStrategy.backOff(); - } else if(e instanceof InstagramBadRequestException) { - LOGGER.error("Received Bad Requests exception form Instagram: {}", e); - attempts = MAX_ATTEMPTS; //don't repeat bad requests. - ++this.consecutiveErrorCount; - } else { - LOGGER.error("Received Expection while attempting to poll Instagram: {}", e); - ++this.consecutiveErrorCount; - } - if(this.consecutiveErrorCount > Math.max(this.numAvailableTokens(), MAX_ATTEMPTS*2)) { - throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts."); - } - } - if(succesfullDataPull = feed != null) { - this.consecutiveErrorCount = 0; - this.backOffStrategy.reset(); - pagination = feed.getPagination(); - queueData(feed.getData(), user.getUserId()); - } - } - if(!succesfullDataPull) { - LOGGER.error("Failed to get data from instagram for user id, {}, skipping user.", user.getUserId()); - } - } while (pagination != null && pagination.hasNextPage()); + /** + * Pull Recement Media for a user and queues the resulting data. Will try a single call 5 times before failing and + * moving on to the next call or returning. + * @param user user + * @throws Exception Exception + */ + @Override + protected void collectInstagramDataForUser(User user) throws Exception { + Pagination pagination = null; + do { + int attempts = 0; + boolean succesfullDataPull = false; + while (!succesfullDataPull && attempts < MAX_ATTEMPTS) { + ++attempts; + MediaFeed feed = null; + try { + if (pagination == null) { + feed = getNextInstagramClient().getRecentMediaFeed(user.getUserId(), + 0, + null, + null, + user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(), + user.getAfterDate() == null ? null : user.getAfterDate().toDate()); + } else { + feed = getNextInstagramClient().getRecentMediaNextPage(pagination); + } + } catch (Exception ex) { + if ( ex instanceof InstagramRateLimitException) { + LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", ex); + this.backOffStrategy.backOff(); + } else if ( ex instanceof InstagramBadRequestException) { + LOGGER.error("Received Bad Requests exception form Instagram: {}", ex); + attempts = MAX_ATTEMPTS; //don't repeat bad requests. + ++this.consecutiveErrorCount; + } else { + LOGGER.error("Received Expection while attempting to poll Instagram: {}", ex); + ++this.consecutiveErrorCount; + } + if (this.consecutiveErrorCount > Math.max(this.numAvailableTokens(), MAX_ATTEMPTS * 2)) { + throw new Exception( + "InstagramCollector failed to successfully connect to instagram on " + this.consecutiveErrorCount + " attempts."); + } + } + if (succesfullDataPull = feed != null) { + this.consecutiveErrorCount = 0; + this.backOffStrategy.reset(); + pagination = feed.getPagination(); + queueData(feed.getData(), user.getUserId()); + } + } + if (!succesfullDataPull) { + LOGGER.error("Failed to get data from instagram for user id, {}, skipping user.", user.getUserId()); + } } + while (pagination != null && pagination.hasNextPage()); + } }