Hi,
We are designing a solution which pulls file paths from Kafka and for the
current stage just counts the lines in each of these files.
When running the code it fails on:
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at
org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146)
at
org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46)
at streamReader.App.main(App.java:66)
Is using the sparkContext from inside a map function wrong ?
This is the code we are using:
SparkConf conf = new SparkConf().setAppName("Simple
Application").setMaster("spark://namenode:7077");
// KAFKA
final JavaStreamingContext jssc = new
JavaStreamingContext(conf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put("uploadedFiles", 1);
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, "localhost:2181", "group3",
topicMap);
JavaDStream<String> files = messages.map(new
Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaPairDStream<String, Integer> pairs = messages.mapToPair(
new PairFunction<Tuple2<String, String>, String, Integer>()
{
public Tuple2<String, Integer> call(Tuple2<String,
String> word) throws Exception
{
JavaRDD<String> textfile =
jssc.sparkContext().textFile(word._2());
int test = new Long(textfile.count()).intValue();
return new Tuple2<String,
Integer>(word._2(), test);
}
});
System.out.println("Printing Messages:");
pairs.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
Thanks,
Daniel