http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java index e907082..be59bd7 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java @@ -15,16 +15,9 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.facebook.provider; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.ConfigRenderOptions; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; @@ -34,6 +27,16 @@ import org.apache.streams.facebook.IdConfig; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.ComponentUtils; import org.apache.streams.util.SerializationUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.ConfigRenderOptions; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +47,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -52,103 +56,109 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public abstract class FacebookProvider implements StreamsProvider { - private final static String STREAMS_ID = "FacebookProvider"; - - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookProvider.class); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private static final int MAX_BATCH_SIZE = 2000; - - protected FacebookConfiguration configuration; - protected BlockingQueue<StreamsDatum> datums; - - private AtomicBoolean isComplete; - private ListeningExecutorService executor; - List<ListenableFuture<Object>> futures = new ArrayList<>(); - - private FacebookDataCollector dataCollector; - - public FacebookProvider() { - try { - this.configuration = MAPPER.readValue(StreamsConfigurator.config.getConfig("facebook").root().render(ConfigRenderOptions.concise()), FacebookConfiguration.class); - } catch (IOException ioe) { - LOGGER.error("Exception trying to read default config : {}", ioe); - } - } - - public FacebookProvider(FacebookConfiguration configuration) { - this.configuration = (FacebookConfiguration) SerializationUtil.cloneBySerialization(configuration); - } - - @Override - public String getId() { - return STREAMS_ID; - } + private static final String STREAMS_ID = "FacebookProvider"; - @Override - public void startStream() { - ListenableFuture future = executor.submit(getDataCollector()); - futures.add(future); - executor.shutdown(); - } + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookProvider.class); + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final int MAX_BATCH_SIZE = 2000; - protected abstract FacebookDataCollector getDataCollector(); - - @Override - public StreamsResultSet readCurrent() { - int batchSize = 0; - BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue(); - while(!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) { - ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), batch); - ++batchSize; - } - return new StreamsResultSet(batch); - } + protected FacebookConfiguration configuration; + protected BlockingQueue<StreamsDatum> datums; - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } + private AtomicBoolean isComplete; + private ListeningExecutorService executor; + List<ListenableFuture<Object>> futures = new ArrayList<>(); - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } + private FacebookDataCollector dataCollector; - @Override - public void prepare(Object configurationObject) { - this.datums = Queues.newLinkedBlockingQueue(); - this.isComplete = new AtomicBoolean(false); - this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + /** + * FacebookProvider constructor - resolves FacebookConfiguration from JVM 'facebook'. + */ + public FacebookProvider() { + try { + this.configuration = MAPPER.readValue(StreamsConfigurator.config.getConfig("facebook").root().render(ConfigRenderOptions.concise()), FacebookConfiguration.class); + } catch (IOException ioe) { + LOGGER.error("Exception trying to read default config : {}", ioe); } - - @Override - public void cleanUp() { - ComponentUtils.shutdownExecutor(executor, 5, 5); - executor = null; + } + + /** + * FacebookProvider constructor - uses supplied FacebookConfiguration. + */ + public FacebookProvider(FacebookConfiguration configuration) { + this.configuration = (FacebookConfiguration) SerializationUtil.cloneBySerialization(configuration); + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void startStream() { + ListenableFuture future = executor.submit(getDataCollector()); + futures.add(future); + executor.shutdown(); + } + + protected abstract FacebookDataCollector getDataCollector(); + + @Override + public StreamsResultSet readCurrent() { + int batchSize = 0; + BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue(); + while (!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) { + ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), batch); + ++batchSize; } - - /** - * Overrides the ids and addedAfter time in the configuration - * @param idsToAfterDate - */ - public void overrideIds(Map<String, DateTime> idsToAfterDate) { - Set<IdConfig> ids = Sets.newHashSet(); - for(String id : idsToAfterDate.keySet()) { - IdConfig idConfig = new IdConfig(); - idConfig.setId(id); - idConfig.setAfterDate(idsToAfterDate.get(id)); - ids.add(idConfig); - } - this.configuration.setIds(ids); + return new StreamsResultSet(batch); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public void prepare(Object configurationObject) { + this.datums = Queues.newLinkedBlockingQueue(); + this.isComplete = new AtomicBoolean(false); + this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + } + + @Override + public void cleanUp() { + ComponentUtils.shutdownExecutor(executor, 5, 5); + executor = null; + } + + /** + * Overrides the ids and addedAfter time in the configuration. + * @param idsToAfterDate idsToAfterDate + */ + public void overrideIds(Map<String, DateTime> idsToAfterDate) { + Set<IdConfig> ids = Sets.newHashSet(); + for (String id : idsToAfterDate.keySet()) { + IdConfig idConfig = new IdConfig(); + idConfig.setId(id); + idConfig.setAfterDate(idsToAfterDate.get(id)); + ids.add(idConfig); } - - @Override - public boolean isRunning() { - if (datums.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); - isComplete.set(true); - LOGGER.info("Exiting"); - } - return !isComplete.get(); + this.configuration.setIds(ids); + } + + @Override + public boolean isRunning() { + if (datums.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + isComplete.set(true); + LOGGER.info("Exiting"); } + return !isComplete.get(); + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java index 1262106..3939f23 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java @@ -18,24 +18,24 @@ package org.apache.streams.facebook.provider; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import facebook4j.*; -import facebook4j.conf.ConfigurationBuilder; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.facebook.FacebookUserInformationConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; + +import org.apache.commons.lang.NotImplementedException; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang.NotImplementedException; import java.io.IOException; import java.io.Serializable; @@ -44,262 +44,291 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -public class FacebookUserInformationProvider implements StreamsProvider, Serializable -{ +import facebook4j.Facebook; +import facebook4j.FacebookException; +import facebook4j.FacebookFactory; +import facebook4j.Friend; +import facebook4j.Paging; +import facebook4j.ResponseList; +import facebook4j.User; +import facebook4j.conf.ConfigurationBuilder; - public static final String STREAMS_ID = "FacebookUserInformationProvider"; - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class); +public class FacebookUserInformationProvider implements StreamsProvider, Serializable { - private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + public static final String STREAMS_ID = "FacebookUserInformationProvider"; - private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; - private FacebookUserInformationConfiguration facebookUserInformationConfiguration; + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class); - private Class klass; - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - public FacebookUserInformationConfiguration getConfig() { return facebookUserInformationConfiguration; } + private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activities, user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; + private FacebookUserInformationConfiguration facebookUserInformationConfiguration; - public void setConfig(FacebookUserInformationConfiguration config) { this.facebookUserInformationConfiguration = config; } + private Class klass; + protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - protected Iterator<String[]> idsBatches; + public FacebookUserInformationConfiguration getConfig() { + return facebookUserInformationConfiguration; + } - protected ExecutorService executor; + public void setConfig(FacebookUserInformationConfiguration config) { + this.facebookUserInformationConfiguration = config; + } - protected DateTime start; - protected DateTime end; + protected Iterator<String[]> idsBatches; - protected final AtomicBoolean running = new AtomicBoolean(); + protected ExecutorService executor; - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } + protected DateTime start; + protected DateTime end; - public FacebookUserInformationProvider() { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration facebookUserInformationConfiguration; - try { - facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - } + protected final AtomicBoolean running = new AtomicBoolean(); - public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) { - this.facebookUserInformationConfiguration = config; - } + private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) { + return new ThreadPoolExecutor(numThreads, numThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } - public FacebookUserInformationProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration facebookUserInformationConfiguration; - try { - facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - this.klass = klass; + /** + * FacebookUserInformationProvider constructor - resolves FacebookUserInformationConfiguration from JVM 'facebook'. + */ + public FacebookUserInformationProvider() { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration facebookUserInformationConfiguration; + try { + facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException ex) { + ex.printStackTrace(); + return; } - - public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) { - this.facebookUserInformationConfiguration = config; - this.klass = klass; + } + + /** + * FacebookUserInformationProvider constructor - uses supplie FacebookUserInformationConfiguration. + * @param config + */ + public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) { + this.facebookUserInformationConfiguration = config; + } + + public FacebookUserInformationProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration facebookUserInformationConfiguration; + try { + facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException ex) { + ex.printStackTrace(); + return; } - - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; + this.klass = klass; + } + + public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) { + this.facebookUserInformationConfiguration = config; + this.klass = klass; + } + + public Queue<StreamsDatum> getProviderQueue() { + return this.providerQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void startStream() { + running.set(true); + } + + @Override + public StreamsResultSet readCurrent() { + + Preconditions.checkArgument(idsBatches.hasNext()); + + LOGGER.info("readCurrent"); + + Facebook client = getFacebookClient(); + + try { + User me = client.users().getMe(); + String json = mapper.writeValueAsString(me); + providerQueue.add( + new StreamsDatum(json, DateTime.now()) + ); + } catch (JsonProcessingException ex) { + ex.printStackTrace(); + } catch (FacebookException ex) { + ex.printStackTrace(); } - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public void startStream() { - running.set(true); - } - - public StreamsResultSet readCurrent() { - - Preconditions.checkArgument(idsBatches.hasNext()); - - LOGGER.info("readCurrent"); - - Facebook client = getFacebookClient(); - + if (idsBatches.hasNext()) { + while (idsBatches.hasNext()) { try { - User me = client.users().getMe(); - String json = mapper.writeValueAsString(me); - providerQueue.add( - new StreamsDatum(json, DateTime.now()) - ); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } catch (FacebookException e) { - e.printStackTrace(); - } + List<User> userList = client.users().getUsers(idsBatches.next()); + for (User user : userList) { - if( idsBatches.hasNext()) { - while (idsBatches.hasNext()) { - try { - List<User> userList = client.users().getUsers(idsBatches.next()); - for (User user : userList) { - - try { - String json = mapper.writeValueAsString(user); - providerQueue.add( - new StreamsDatum(json, DateTime.now()) - ); - } catch (JsonProcessingException e) { - // e.printStackTrace(); - } - } - - } catch (FacebookException e) { - e.printStackTrace(); - } - } - } else { try { - ResponseList<Friend> friendResponseList = client.friends().getFriends(); - Paging<Friend> friendPaging; - do { - - for( Friend friend : friendResponseList ) { - - String json; - try { - json = mapper.writeValueAsString(friend); - providerQueue.add( - new StreamsDatum(json) - ); - } catch (JsonProcessingException e) { -// e.printStackTrace(); - } - } - friendPaging = friendResponseList.getPaging(); - friendResponseList = client.fetchNext(friendPaging); - } while( friendPaging != null && - friendResponseList != null ); - } catch (FacebookException e) { - e.printStackTrace(); + String json = mapper.writeValueAsString(user); + providerQueue.add( + new StreamsDatum(json, DateTime.now()) + ); + } catch (JsonProcessingException ex) { + LOGGER.trace("JsonProcessingException", ex); } + } + } catch (FacebookException ex) { + ex.printStackTrace(); } + } + } else { + try { + ResponseList<Friend> friendResponseList = client.friends().getFriends(); + Paging<Friend> friendPaging; + do { - LOGGER.info("Finished. Cleaning up..."); - - LOGGER.info("Providing {} docs", providerQueue.size()); - - StreamsResultSet result = new StreamsResultSet(providerQueue); - running.set(false); + for ( Friend friend : friendResponseList ) { - LOGGER.info("Exiting"); - - return result; - - } - - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } - - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; - } + String json; + try { + json = mapper.writeValueAsString(friend); + providerQueue.add( + new StreamsDatum(json) + ); + } catch (JsonProcessingException ex) { + LOGGER.trace("JsonProcessingException", ex); + } + } + friendPaging = friendResponseList.getPaging(); + friendResponseList = client.fetchNext(friendPaging); + } + while ( friendPaging != null + && + friendResponseList != null ); + } catch (FacebookException ex) { + ex.printStackTrace(); + } - @Override - public boolean isRunning() { - return running.get(); } - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); + LOGGER.info("Finished. Cleaning up..."); + + LOGGER.info("Providing {} docs", providerQueue.size()); + + StreamsResultSet result = new StreamsResultSet(providerQueue); + running.set(false); + + LOGGER.info("Exiting"); + + return result; + + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate"); } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } + } - @Override - public void prepare(Object o) { - - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + @Override + public void prepare(Object configurationObject) { - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId()); - Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret()); - Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken()); - Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo()); + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - List<String> ids = new ArrayList<String>(); - List<String[]> idsBatches = new ArrayList<String[]>(); + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId()); + Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret()); + Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken()); + Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo()); - for(String s : facebookUserInformationConfiguration.getInfo()) { - if(s != null) - { - ids.add(s); + List<String> ids = new ArrayList<String>(); + List<String[]> idsBatches = new ArrayList<String[]>(); - if(ids.size() >= 100) { - // add the batch - idsBatches.add(ids.toArray(new String[ids.size()])); - // reset the Ids - ids = new ArrayList<String>(); - } + for (String s : facebookUserInformationConfiguration.getInfo()) { + if (s != null) { + ids.add(s); - } + if (ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new String[ids.size()])); + // reset the Ids + ids = new ArrayList<String>(); } - if(ids.size() > 0) - idsBatches.add(ids.toArray(new String[ids.size()])); - - this.idsBatches = idsBatches.iterator(); + } } - protected Facebook getFacebookClient() - { - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true) - .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId()) - .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret()) - .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken()) - .setOAuthPermissions(ALL_PERMISSIONS) - .setJSONStoreEnabled(true) - .setClientVersion("v1.0"); - - FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - - return facebook; + if (ids.size() > 0) { + idsBatches.add(ids.toArray(new String[ids.size()])); } - @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } + this.idsBatches = idsBatches.iterator(); + } + + protected Facebook getFacebookClient() { + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId()) + .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret()) + .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken()) + .setOAuthPermissions(ALL_PERMISSIONS) + .setJSONStoreEnabled(true) + .setClientVersion("v1.0"); + + FacebookFactory ff = new FacebookFactory(cb.build()); + Facebook facebook = ff.getInstance(); + + return facebook; + } + + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java index 0f2121a..b292d30 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java @@ -18,18 +18,6 @@ package org.apache.streams.facebook.provider; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import facebook4j.*; -import facebook4j.Post; -import facebook4j.conf.ConfigurationBuilder; -import facebook4j.json.DataObjectFactory; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; @@ -39,10 +27,20 @@ import org.apache.streams.facebook.FacebookUserInformationConfiguration; import org.apache.streams.facebook.FacebookUserstreamConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; + +import org.apache.commons.lang.NotImplementedException; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang.NotImplementedException; import java.io.IOException; import java.io.Serializable; @@ -51,276 +49,306 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import facebook4j.Facebook; +import facebook4j.FacebookException; +import facebook4j.FacebookFactory; +import facebook4j.Post; +import facebook4j.ResponseList; +import facebook4j.conf.ConfigurationBuilder; +import facebook4j.json.DataObjectFactory; + public class FacebookUserstreamProvider implements StreamsProvider, Serializable { - public static final String STREAMS_ID = "FacebookUserstreamProvider"; + public static final String STREAMS_ID = "FacebookUserstreamProvider"; - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class); - private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private static final String ALL_PERMISSIONS = "read_stream"; - private FacebookUserstreamConfiguration configuration; + private static final String ALL_PERMISSIONS = "read_stream"; + private FacebookUserstreamConfiguration configuration; - private Class klass; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + private Class klass; + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); + protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - public FacebookUserstreamConfiguration getConfig() { - return configuration; - } + public FacebookUserstreamConfiguration getConfig() { + return configuration; + } - public void setConfig(FacebookUserstreamConfiguration config) { - this.configuration = config; - } + public void setConfig(FacebookUserstreamConfiguration config) { + this.configuration = config; + } - protected ListeningExecutorService executor; + protected ListeningExecutorService executor; - protected DateTime start; - protected DateTime end; + protected DateTime start; + protected DateTime end; - protected final AtomicBoolean running = new AtomicBoolean(); + protected final AtomicBoolean running = new AtomicBoolean(); - private DatumStatusCounter countersCurrent = new DatumStatusCounter(); - private DatumStatusCounter countersTotal = new DatumStatusCounter(); + private DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private DatumStatusCounter countersTotal = new DatumStatusCounter(); - protected Facebook client; + protected Facebook client; - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } + private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) { + return new ThreadPoolExecutor(numThreads, numThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } - public FacebookUserstreamProvider() { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration facebookUserInformationConfiguration; - try { - facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } + /** + * FacebookUserstreamProvider constructor. + */ + public FacebookUserstreamProvider() { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration facebookUserInformationConfiguration; + try { + facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException ex) { + ex.printStackTrace(); + return; } - - public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) { - this.configuration = config; + } + + /** + * FacebookUserstreamProvider constructor. + * @param config config + */ + public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) { + this.configuration = config; + } + + /** + * FacebookUserstreamProvider constructor. + * @param klass output Class + */ + public FacebookUserstreamProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration facebookUserInformationConfiguration; + try { + facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException ex) { + ex.printStackTrace(); + return; } - - public FacebookUserstreamProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration facebookUserInformationConfiguration; - try { - facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - this.klass = klass; + this.klass = klass; + } + + public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) { + this.configuration = config; + this.klass = klass; + } + + public Queue<StreamsDatum> getProviderQueue() { + return this.providerQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void startStream() { + + client = getFacebookClient(); + + if ( configuration.getInfo() != null + && + configuration.getInfo().size() > 0 ) { + for ( String id : configuration.getInfo()) { + executor.submit(new FacebookFeedPollingTask(this, id)); + } + running.set(true); + } else { + try { + String id = client.getMe().getId(); + executor.submit(new FacebookFeedPollingTask(this, id)); + running.set(true); + } catch (FacebookException ex) { + LOGGER.error(ex.getMessage()); + running.set(false); + } } + } - public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) { - this.configuration = config; - this.klass = klass; - } + @Override + public StreamsResultSet readCurrent() { - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; - } + StreamsResultSet current; - @Override - public String getId() { - return STREAMS_ID; + synchronized (FacebookUserstreamProvider.class) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + providerQueue.clear(); } - @Override - public void startStream() { - - client = getFacebookClient(); - - if( configuration.getInfo() != null && - configuration.getInfo().size() > 0 ) { - for( String id : configuration.getInfo()) { - executor.submit(new FacebookFeedPollingTask(this, id)); - } - running.set(true); - } else { - try { - String id = client.getMe().getId(); - executor.submit(new FacebookFeedPollingTask(this, id)); - running.set(true); - } catch (FacebookException e) { - LOGGER.error(e.getMessage()); - running.set(false); - } + return current; + + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet) providerQueue.iterator(); + return result; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate"); } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } + } - public StreamsResultSet readCurrent() { + @Override + public void prepare(Object configurationObject) { - StreamsResultSet current; + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - synchronized (FacebookUserstreamProvider.class) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); - current.setCounter(new DatumStatusCounter()); - current.getCounter().add(countersCurrent); - countersTotal.add(countersCurrent); - countersCurrent = new DatumStatusCounter(); - providerQueue.clear(); - } + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(configuration.getOauth().getAppId()); + Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); + Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); - return current; + client = getFacebookClient(); - } + if ( configuration.getInfo() != null + && + configuration.getInfo().size() > 0 ) { - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } + List<String> ids = new ArrayList<String>(); + List<String[]> idsBatches = new ArrayList<String[]>(); - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet) providerQueue.iterator(); - return result; - } + for (String s : configuration.getInfo()) { + if (s != null) { + ids.add(s); - @Override - public boolean isRunning() { - return running.get(); - } + if (ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new String[ids.size()])); + // reset the Ids + ids = new ArrayList<String>(); + } - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); } + } } + } - @Override - public void prepare(Object o) { - - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + protected Facebook getFacebookClient() { + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthAppId(configuration.getOauth().getAppId()) + .setOAuthAppSecret(configuration.getOauth().getAppSecret()) + .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) + .setOAuthPermissions(ALL_PERMISSIONS) + .setJSONStoreEnabled(true); - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(configuration.getOauth().getAppId()); - Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); - Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); + FacebookFactory ff = new FacebookFactory(cb.build()); + Facebook facebook = ff.getInstance(); - client = getFacebookClient(); + return facebook; + } - if( configuration.getInfo() != null && - configuration.getInfo().size() > 0 ) { + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } - List<String> ids = new ArrayList<String>(); - List<String[]> idsBatches = new ArrayList<String[]>(); + private class FacebookFeedPollingTask implements Runnable { - for (String s : configuration.getInfo()) { - if (s != null) { - ids.add(s); + FacebookUserstreamProvider provider; + Facebook client; + String id; - if (ids.size() >= 100) { - // add the batch - idsBatches.add(ids.toArray(new String[ids.size()])); - // reset the Ids - ids = new ArrayList<String>(); - } + private Set<Post> priorPollResult = Sets.newHashSet(); - } - } - } + public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { + this.provider = facebookUserstreamProvider; } - protected Facebook getFacebookClient() { - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true) - .setOAuthAppId(configuration.getOauth().getAppId()) - .setOAuthAppSecret(configuration.getOauth().getAppSecret()) - .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) - .setOAuthPermissions(ALL_PERMISSIONS) - .setJSONStoreEnabled(true); - - FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - - return facebook; + public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) { + this.provider = facebookUserstreamProvider; + this.client = provider.client; + this.id = id; } @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } - - private class FacebookFeedPollingTask implements Runnable { - - FacebookUserstreamProvider provider; - Facebook client; - String id; - - private Set<Post> priorPollResult = Sets.newHashSet(); - - public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { - this.provider = facebookUserstreamProvider; - } - - public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) { - this.provider = facebookUserstreamProvider; - this.client = provider.client; - this.id = id; - } - @Override - public void run() { - while (provider.isRunning()) { - ResponseList<Post> postResponseList; - try { - postResponseList = client.getFeed(id); - - Set<Post> update = Sets.newHashSet(postResponseList); - Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); - Set<Post> entrySet = Sets.difference(update, repeats); - LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size()); - for (Post item : entrySet) { - String json = DataObjectFactory.getRawJSON(item); - org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); - try { - lock.readLock().lock(); - ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); - countersCurrent.incrementAttempt(); - } finally { - lock.readLock().unlock(); - } - } - priorPollResult = update; - } catch (Exception e) { - e.printStackTrace(); - } finally { - try { - Thread.sleep(configuration.getPollIntervalMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } + public void run() { + while (provider.isRunning()) { + ResponseList<Post> postResponseList; + try { + postResponseList = client.getFeed(id); + + Set<Post> update = Sets.newHashSet(postResponseList); + Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); + Set<Post> entrySet = Sets.difference(update, repeats); + LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size()); + for (Post item : entrySet) { + String json = DataObjectFactory.getRawJSON(item); + org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); + try { + lock.readLock().lock(); + ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); + countersCurrent.incrementAttempt(); + } finally { + lock.readLock().unlock(); } + } + priorPollResult = update; + } catch (Exception ex) { + ex.printStackTrace(); + } finally { + try { + Thread.sleep(configuration.getPollIntervalMillis()); + } catch (InterruptedException interrupt) { + Thread.currentThread().interrupt(); + } } + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java index 0e88dd4..68d8f06 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java @@ -20,7 +20,6 @@ package org.apache.streams.facebook.provider.page; import org.apache.streams.core.StreamsDatum; import org.apache.streams.facebook.FacebookConfiguration; -import org.apache.streams.facebook.FacebookPageProviderConfiguration; import org.apache.streams.facebook.IdConfig; import org.apache.streams.facebook.provider.FacebookDataCollector; import org.apache.streams.jackson.StreamsJacksonMapper; @@ -28,7 +27,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,11 +34,10 @@ import java.util.concurrent.BlockingQueue; import facebook4j.FacebookException; import facebook4j.Page; -import facebook4j.Reading; import facebook4j.json.DataObjectFactory; /** - * Collects the page data from public Facebook pages + * Collects the page data from public Facebook pages. */ public class FacebookPageDataCollector extends FacebookDataCollector { @@ -48,11 +45,8 @@ public class FacebookPageDataCollector extends FacebookDataCollector { private static final int MAX_ATTEMPTS = 5; private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private String fields; - - public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookPageProviderConfiguration configuration) { + public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) { super(configuration, queue); - fields = StringUtils.join(configuration.getFields(), ','); } @Override @@ -70,7 +64,7 @@ public class FacebookPageDataCollector extends FacebookDataCollector { while (attempt < MAX_ATTEMPTS) { ++attempt; try { - Page page = getNextFacebookClient().getPage(pageId, new Reading().fields(fields)); + Page page = getNextFacebookClient().getPage(pageId); return page; } catch (FacebookException fe) { LOGGER.error("Facebook returned an exception : {}", fe); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java index d11a486..e7bbcfa 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java @@ -80,9 +80,8 @@ public class FacebookPageProvider extends FacebookProvider { private FacebookPageProviderConfiguration configuration; - public FacebookPageProvider(FacebookPageProviderConfiguration facebookConfiguration) { + public FacebookPageProvider(FacebookConfiguration facebookConfiguration) { super(facebookConfiguration); - configuration = facebookConfiguration; } @VisibleForTesting @@ -92,7 +91,7 @@ public class FacebookPageProvider extends FacebookProvider { @Override protected FacebookDataCollector getDataCollector() { - return new FacebookPageDataCollector(super.datums, configuration); + return new FacebookPageDataCollector(super.datums, super.configuration); } /** @@ -115,7 +114,7 @@ public class FacebookPageProvider extends FacebookProvider { Config typesafe = conf.withFallback(reference).resolve(); StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - FacebookPageProviderConfiguration config = new ComponentConfigurator<>(FacebookPageProviderConfiguration.class).detectConfiguration(typesafe, "facebook"); + FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook"); FacebookPageProvider provider = new FacebookPageProvider(config); PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java index c2ba700..f509170 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java @@ -15,116 +15,124 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.facebook.provider.pagefeed; -import com.fasterxml.jackson.databind.ObjectMapper; -import facebook4j.*; -import facebook4j.json.DataObjectFactory; import org.apache.streams.core.StreamsDatum; import org.apache.streams.facebook.FacebookConfiguration; import org.apache.streams.facebook.IdConfig; import org.apache.streams.facebook.provider.FacebookDataCollector; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; +import facebook4j.FacebookException; +import facebook4j.Paging; +import facebook4j.Post; +import facebook4j.Reading; +import facebook4j.ResponseList; +import facebook4j.json.DataObjectFactory; + /** - * Collects the page feed data from public Facebook pages + * Collects the page feed data from public Facebook pages. */ public class FacebookPageFeedDataCollector extends FacebookDataCollector { - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageFeedDataCollector.class); - private static final int MAX_ATTEMPTS = 5; - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private static final int LIMIT = 100; + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageFeedDataCollector.class); + private static final int MAX_ATTEMPTS = 5; + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final int LIMIT = 100; - public FacebookPageFeedDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) { - super(configuration, queue); - } + public FacebookPageFeedDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) { + super(configuration, queue); + } + + @Override + protected void getData(IdConfig id) throws Exception { + boolean exit = false; - @Override - protected void getData(IdConfig id) throws Exception { - boolean exit = false; + ResponseList<Post> facebookPosts = getPosts(id.getId()); + LOGGER.debug("Post received : {}", facebookPosts.size()); + backOff.reset(); + do { + for (Post post : facebookPosts) { + if (id.getBeforeDate() != null && id.getAfterDate() != null) { + if (id.getBeforeDate().isAfter(post.getCreatedTime().getTime()) + && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) { + super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId()); - ResponseList<Post> facebookPosts = getPosts(id.getId()); - LOGGER.debug("Post received : {}", facebookPosts.size()); + } + } else if (id.getBeforeDate() != null && id.getBeforeDate().isAfter(post.getCreatedTime().getTime())) { + super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId()); + } else if (id.getAfterDate() != null && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) { + super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId()); + } else if (id.getBeforeDate() == null && id.getAfterDate() == null) { + super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId()); + } else { + exit = true; + LOGGER.debug("Breaking on post, {}, with createdAtDate {}", post.getId(), post.getCreatedTime()); + break; + } + } + if (facebookPosts.getPaging() != null && !exit) { + LOGGER.debug("Paging. . ."); + facebookPosts = getPosts(facebookPosts.getPaging()); backOff.reset(); - do { - for(Post post : facebookPosts) { - if(id.getBeforeDate() != null && id.getAfterDate() != null) { - if(id.getBeforeDate().isAfter(post.getCreatedTime().getTime()) - && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) { - super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId()); - - } - } else if(id.getBeforeDate() != null && id.getBeforeDate().isAfter(post.getCreatedTime().getTime())) { - super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId()); - } else if(id.getAfterDate() != null && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) { - super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId()); - } else if(id.getBeforeDate() == null && id.getAfterDate() == null) { - super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId()); - } else { - exit = true; - LOGGER.debug("Breaking on post, {}, with createdAtDate {}", post.getId(), post.getCreatedTime()); - break; - } - } - if(facebookPosts.getPaging() != null && !exit) { - LOGGER.debug("Paging. . ."); - facebookPosts = getPosts(facebookPosts.getPaging()); - backOff.reset(); - LOGGER.debug("Paging received {} posts*", facebookPosts.size()); - } else { - LOGGER.debug("No more paging."); - facebookPosts = null; - } - } while(facebookPosts != null && facebookPosts.size() != 0); + LOGGER.debug("Paging received {} posts*", facebookPosts.size()); + } else { + LOGGER.debug("No more paging."); + facebookPosts = null; + } + } + while (facebookPosts != null && facebookPosts.size() != 0); + } - } + private ResponseList<Post> getPosts(Paging<Post> paging) throws Exception { + return getPosts(null, paging); + } - private ResponseList<Post> getPosts(Paging<Post> paging) throws Exception{ - return getPosts(null, paging); - } + private ResponseList<Post> getPosts(String pageId) throws Exception { + return getPosts(pageId, null); + } - private ResponseList<Post> getPosts(String pageId) throws Exception { - return getPosts(pageId, null); - } + /** + * Queries facebook. Attempts requests up to 5 times and backs off on each facebook exception. + * @param pageId pageId + * @param paging paging + * @return ResponseList of $link{facebook4j.Post} + * @throws Exception Exception + */ + private ResponseList<Post> getPosts(String pageId, Paging<Post> paging) throws Exception { + int attempt = 0; + while (attempt < MAX_ATTEMPTS) { + ++attempt; + try { + if (pageId != null) { + Reading reading = new Reading(); + reading.limit(LIMIT); + return getNextFacebookClient().getPosts(pageId, reading); + } else { + return getNextFacebookClient().fetchNext(paging); + } + } catch (FacebookException fe) { + LOGGER.error("Facebook returned an exception : {}", fe); + LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage()); + //TODO Rate limit exceptions with facebook4j unclear http://facebook4j.org/oldjavadocs/1.1.12-2.0.0/2.0.0/index.html?facebook4j/internal/http/HttpResponseCode.html + // back off at all exceptions until figured out. + int errorCode = fe.getErrorCode(); - /** - * Queries facebook. Attempts requests up to 5 times and backs off on each facebook exception. - * @param pageId - * @param paging - * @return - * @throws Exception - */ - private ResponseList<Post> getPosts(String pageId, Paging<Post> paging) throws Exception { - int attempt = 0; - while(attempt < MAX_ATTEMPTS) { - ++attempt; - try { - if (pageId != null) { - Reading reading = new Reading(); - reading.limit(LIMIT); - return getNextFacebookClient().getPosts(pageId, reading); - } - else - return getNextFacebookClient().fetchNext(paging); - } catch (FacebookException fe) { - LOGGER.error("Facebook returned an exception : {}", fe); - LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage()); - //TODO Rate limit exceptions with facebook4j unclear http://facebook4j.org/oldjavadocs/1.1.12-2.0.0/2.0.0/index.html?facebook4j/internal/http/HttpResponseCode.html - // back off at all exceptions until figured out. - int errorCode = fe.getErrorCode(); - - //Some sort of rate limiting - if(errorCode == 17 || errorCode == 4 || errorCode == 341) { - super.backOff.backOff(); - } - } + //Some sort of rate limiting + if (errorCode == 17 || errorCode == 4 || errorCode == 341) { + super.backOff.backOff(); } - throw new Exception("Failed to get data from facebook after "+MAX_ATTEMPTS); + } } + throw new Exception("Failed to get data from facebook after " + MAX_ATTEMPTS); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java index 308b129..5d977e0 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java @@ -15,15 +15,9 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.facebook.provider.pagefeed; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; @@ -33,6 +27,15 @@ import org.apache.streams.facebook.provider.FacebookDataCollector; import org.apache.streams.facebook.provider.FacebookProvider; import org.apache.streams.facebook.provider.page.FacebookPageProvider; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,65 +47,71 @@ import java.util.Iterator; import java.util.concurrent.TimeUnit; /** - * + * FacebookPageFeedProvider provides content from facebook public page. */ public class FacebookPageFeedProvider extends FacebookProvider { - public static final String STREAMS_ID = "FacebookPageFeedProvider"; - - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class); - - private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - public FacebookPageFeedProvider() { - super(); - } - - public FacebookPageFeedProvider(FacebookConfiguration config) { - super(config); - } - - @Override - protected FacebookDataCollector getDataCollector() { - return new FacebookPageFeedDataCollector(super.datums, super.configuration); - } - - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config conf = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = conf.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook"); - FacebookPageFeedProvider provider = new FacebookPageFeedProvider(config); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - json = MAPPER.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); + public static final String STREAMS_ID = "FacebookPageFeedProvider"; + + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class); + + private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + public FacebookPageFeedProvider() { + super(); + } + + public FacebookPageFeedProvider(FacebookConfiguration config) { + super(config); + } + + @Override + protected FacebookDataCollector getDataCollector() { + return new FacebookPageFeedDataCollector(super.datums, super.configuration); + } + + /** + * Run FacebookPageFeedProvider from command line. + * @param args configfile outfile + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File confFile = new File(configfile); + assert (confFile.exists()); + Config conf = ConfigFactory.parseFileAnySyntax(confFile, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = conf.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook"); + FacebookPageFeedProvider provider = new FacebookPageFeedProvider(config); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = MAPPER.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); + } + } } + while ( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } }