Hallo,
Out of curiosity, I try to implement the following example in Java
according to the following site:
http://ampcamp.berkeley.edu/3/exercises/realtime-processing-with-spark-streaming.html
Unfortunately, I did not find a recent example for using a Twitter source
in Spark Streaming with Java. The other sources work perfectly well.
I can compile without problem and I can run the example without problem.
However, I do not manage to get any tweet. I checked already the
credentials with another twitter client I developed and they work without
any problem. It seems that the Spark job is not connecting to Twittter at
all, because it does not care if there are wrong or correct clients - there
is no error message, but also no tweets.
I use spark-streaming-twitter_2.10 v 0.9.1 (same result with
0.9.0-incubating) and twitter4j 3.0.3. It is the cloudera quickstart vm
)5.0.0) with Spark 0.9.0
Do you have any idea?
Thank you.
Best regards,
Find my source here:
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.TwitterUtils;
import twitter4j.*;
public class StreamDriver {
public static boolean loadTwitterCredentials(String configFile) {
Properties twitterProp = new Properties();
InputStream propertyInputStream = null;
try {
propertyInputStream = new FileInputStream(configFile);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
twitterProp.load(propertyInputStream);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (twitterProp.containsKey("twitter4j.oauth.consumerKey")==false) {
return false;
}
// set properties for twitter api
Set<Object> keySet = twitterProp.keySet();
Iterator<Object> keySetIterator = keySet.iterator();
while (keySetIterator.hasNext()) {
String currentKey = (String) keySetIterator.next();
String currentValue = twitterProp.getProperty(currentKey);
System.setProperty(currentKey,currentValue);
}
return true;
}
public static void main(String[] args) {
// define configuration
String sparkHome
="/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/spark";
String sparkMasterUrl = "spark://localhost:7077";
String jarFile = "build/libs/example-spark-streaming-0.1.0.jar";
String configTwitter ="./config.properties";
// start program
System.out.println("Spark Streaming Demonstration with Twitter");
if (loadTwitterCredentials(configTwitter)==false) {
System.out.println("Error: Cannot find credentials");
}
JavaStreamingContext ssc = new JavaStreamingContext(sparkMasterUrl,
"Java Streaming Example", new Duration(1000), sparkHome, new
String[]{jarFile});
System.out.println("JavaStreamingContext created");
JavaDStream<Status> twitterStatus = TwitterUtils.createStream(ssc);
System.out.println("twitterStatus created");
JavaDStream<String> statuses = twitterStatus.map(
new Function<Status, String>() {
private static final long serialVersionUID = 1L;
public String call(Status status) { return new
Long(status.getId()).toString(); }
}
);
statuses.print();
// ssc.checkpoint("/tmp");
ssc.start();
ssc.awaitTermination();
}
}
Find the log here:
Spark Streaming Demonstration with Twitter
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/spark/assembly/lib/spark-assembly_2.10-0.9.0-cdh5.0.0-hadoop2.3.0-cdh5.0.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/08/10 14:12:22 WARN util.Utils: Your hostname, localhost.localdomain
resolves to a loopback address: 127.0.0.1; using 192.168.122.27 instead (on
interface eth1)
14/08/10 14:12:22 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind
to another address
14/08/10 14:12:22 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/08/10 14:12:22 INFO Remoting: Starting remoting
14/08/10 14:12:23 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://[email protected]:52642]
14/08/10 14:12:23 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://[email protected]:52642]
14/08/10 14:12:23 INFO spark.SparkEnv: Registering BlockManagerMaster
14/08/10 14:12:23 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140810141223-b2b1
14/08/10 14:12:23 INFO storage.MemoryStore: MemoryStore started with
capacity 512.1 MB.
14/08/10 14:12:23 INFO network.ConnectionManager: Bound socket to port
41173 with id = ConnectionManagerId(192.168.122.27,41173)
14/08/10 14:12:23 INFO storage.BlockManagerMaster: Trying to register
BlockManager
14/08/10 14:12:23 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Registering block manager 192.168.122.27:41173 with 512.1 MB RAM
14/08/10 14:12:23 INFO storage.BlockManagerMaster: Registered BlockManager
14/08/10 14:12:23 INFO spark.HttpServer: Starting HTTP Server
14/08/10 14:12:23 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/08/10 14:12:23 INFO server.AbstractConnector: Started
[email protected]:45360
14/08/10 14:12:23 INFO broadcast.HttpBroadcast: Broadcast server started at
http://192.168.122.27:45360
14/08/10 14:12:23 INFO spark.SparkEnv: Registering MapOutputTracker
14/08/10 14:12:23 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-2a406425-7dd1-4cd9-8dd5-792a3efac179
14/08/10 14:12:23 INFO spark.HttpServer: Starting HTTP Server
14/08/10 14:12:23 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/08/10 14:12:23 INFO server.AbstractConnector: Started
[email protected]:33860
14/08/10 14:12:23 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage,null}
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/stage,null}
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/pool,null}
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages,null}
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/environment,null}
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/executors,null}
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/metrics/json,null}
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/static,null}
14/08/10 14:12:23 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/,null}
14/08/10 14:12:23 INFO server.AbstractConnector: Started
[email protected]:4040
14/08/10 14:12:23 INFO ui.SparkUI: Started Spark Web UI at
http://192.168.122.27:4040
14/08/10 14:12:23 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/08/10 14:12:24 INFO spark.SparkContext: Added JAR
build/libs/example-spark-streaming-0.1.0.jar at
http://192.168.122.27:33860/jars/example-spark-streaming-0.1.0.jar with
timestamp 1407705144109
14/08/10 14:12:24 INFO client.AppClient$ClientActor: Connecting to master
spark://localhost:7077...
JavaStreamingContext created
twitterStatus created
14/08/10 14:12:24 INFO scheduler.NetworkInputTracker: NetworkInputTracker
started
14/08/10 14:12:24 INFO spark.SparkContext: Starting job: collect at
NetworkInputTracker.scala:178
14/08/10 14:12:24 INFO scheduler.DAGScheduler: Registering RDD 3
(reduceByKey at NetworkInputTracker.scala:178)
14/08/10 14:12:24 INFO scheduler.DAGScheduler: Got job 0 (collect at
NetworkInputTracker.scala:178) with 20 output partitions (allowLocal=false)
14/08/10 14:12:24 INFO scheduler.DAGScheduler: Final stage: Stage 0
(collect at NetworkInputTracker.scala:178)
14/08/10 14:12:24 INFO scheduler.DAGScheduler: Parents of final stage:
List(Stage 1)
14/08/10 14:12:24 INFO scheduler.DAGScheduler: Missing parents: List(Stage
1)
14/08/10 14:12:24 INFO scheduler.DAGScheduler: Submitting Stage 1
(MapPartitionsRDD[3] at reduceByKey at NetworkInputTracker.scala:178),
which has no missing parents
14/08/10 14:12:24 INFO scheduler.DAGScheduler: Submitting 50 missing tasks
from Stage 1 (MapPartitionsRDD[3] at reduceByKey at
NetworkInputTracker.scala:178)
14/08/10 14:12:24 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
with 50 tasks
14/08/10 14:12:25 INFO dstream.ForEachDStream: metadataCleanupDelay = 3600
14/08/10 14:12:25 INFO dstream.MappedDStream: metadataCleanupDelay = 3600
14/08/10 14:12:25 INFO twitter.TwitterInputDStream: metadataCleanupDelay =
3600
14/08/10 14:12:25 INFO twitter.TwitterInputDStream: Slide time = 1000 ms
14/08/10 14:12:25 INFO twitter.TwitterInputDStream: Storage level =
StorageLevel(false, false, false, 1)
14/08/10 14:12:25 INFO twitter.TwitterInputDStream: Checkpoint interval =
null
14/08/10 14:12:25 INFO twitter.TwitterInputDStream: Remember duration =
1000 ms
14/08/10 14:12:25 INFO twitter.TwitterInputDStream: Initialized and
validated org.apache.spark.streaming.twitter.TwitterInputDStream@10c8414a
14/08/10 14:12:25 INFO dstream.MappedDStream: Slide time = 1000 ms
14/08/10 14:12:25 INFO dstream.MappedDStream: Storage level =
StorageLevel(false, false, false, 1)
14/08/10 14:12:25 INFO dstream.MappedDStream: Checkpoint interval = null
14/08/10 14:12:25 INFO dstream.MappedDStream: Remember duration = 1000 ms
14/08/10 14:12:25 INFO dstream.MappedDStream: Initialized and validated
org.apache.spark.streaming.dstream.MappedDStream@4b3669e1
14/08/10 14:12:25 INFO dstream.ForEachDStream: Slide time = 1000 ms
14/08/10 14:12:25 INFO dstream.ForEachDStream: Storage level =
StorageLevel(false, false, false, 1)
14/08/10 14:12:25 INFO dstream.ForEachDStream: Checkpoint interval = null
14/08/10 14:12:25 INFO dstream.ForEachDStream: Remember duration = 1000 ms
14/08/10 14:12:25 INFO dstream.ForEachDStream: Initialized and validated
org.apache.spark.streaming.dstream.ForEachDStream@6161595a
14/08/10 14:12:25 INFO scheduler.JobGenerator: JobGenerator started at
1407705146000 ms
14/08/10 14:12:25 INFO scheduler.JobScheduler: JobScheduler started
14/08/10 14:12:26 INFO scheduler.NetworkInputTracker: Stream 0 received 0
blocks
14/08/10 14:12:26 INFO scheduler.JobScheduler: Added jobs for time
1407705146000 ms
-------------------------------------------14/08/10 14:12:26 INFO
scheduler.JobScheduler: Starting job streaming job 1407705146000 ms.0 from
job set of time 1407705146000 ms
Time: 1407705146000 ms
-------------------------------------------
14/08/10 14:12:26 INFO scheduler.JobScheduler: Finished job streaming job
1407705146000 ms.0 from job set of time 1407705146000 ms
14/08/10 14:12:26 INFO scheduler.JobScheduler: Total delay: 0.056 s for
time 1407705146000 ms (execution: 0.012 s)
14/08/10 14:12:27 INFO scheduler.NetworkInputTracker: Stream 0 received 0
blocks
14/08/10 14:12:27 INFO scheduler.JobScheduler: Added jobs for time
1407705147000 ms
-------------------------------------------
14/08/10 14:12:27 INFO scheduler.JobScheduler: Starting job streaming job
1407705147000 ms.0 from job set of time 1407705147000 ms
Time: 1407705147000 ms
-------------------------------------------
14/08/10 14:12:27 INFO scheduler.JobScheduler: Finished job streaming job
1407705147000 ms.0 from job set of time 1407705147000 ms
14/08/10 14:12:27 INFO scheduler.JobScheduler: Total delay: 0.006 s for
time 1407705147000 ms (execution: 0.000 s)
14/08/10 14:12:28 INFO scheduler.NetworkInputTracker: Stream 0 received 0
blocks
14/08/10 14:12:28 INFO scheduler.JobScheduler: Added jobs for time
1407705148000 ms
-------------------------------------------
Time: 1407705148000 ms14/08/10 14:12:28 INFO scheduler.JobScheduler:
Starting job streaming job 1407705148000 ms.0 from job set of time
1407705148000 ms
-------------------------------------------
14/08/10 14:12:28 INFO scheduler.JobScheduler: Finished job streaming job
1407705148000 ms.0 from job set of time 1407705148000 ms
14/08/10 14:12:28 INFO scheduler.JobScheduler: Total delay: 0.004 s for
time 1407705148000 ms (execution: 0.000 s)
14/08/10 14:12:29 INFO scheduler.NetworkInputTracker: Stream 0 received 0
blocks
14/08/10 14:12:29 INFO scheduler.JobScheduler: Added jobs for time
1407705149000 ms
-------------------------------------------
Time: 1407705149000 ms14/08/10 14:12:29 INFO scheduler.JobScheduler:
Starting job streaming job 1407705149000 ms.0 from job set of time
1407705149000 ms
-------------------------------------------
14/08/10 14:12:29 INFO scheduler.JobScheduler: Finished job streaming job
1407705149000 ms.0 from job set of time 1407705149000 ms
14/08/10 14:12:29 INFO scheduler.JobScheduler: Total delay: 0.004 s for
time 1407705149000 ms (execution: 0.000 s)