Hi I am trying to write data that is produced from kafka commandline
producer for some topic. I am facing problem and unable to proceed. Below
is my code which I am creating a jar and running through spark-submit on
spark-shell. Am I doing wrong inside foreachRDD() ? What is wrong with
 SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)   line in below error
message?



SparkConf sparkConf = new
SparkConf().setAppName("JavaKafkaDemo").setMaster("local").setSparkHome("/Users/kvk/softwares/spark-1.3.1-bin-hadoop2.4");
                // Create the context with a 1 second batch size
                JavaStreamingContext jsc = new
JavaStreamingContext(sparkConf, new Duration(5000));

                int numThreads = 2;
                Map<String, Integer> topicMap = new HashMap<String,
Integer>();
           //     topicMap.put("viewTopic", numThreads);
                topicMap.put("nonview", numThreads);

                JavaPairReceiverInputDStream<String, String> messages =
                        KafkaUtils.createStream(jsc, "localhost",
"ViewConsumer", topicMap);

                JavaDStream<String> lines = messages.map(new
Function<Tuple2<String, String>, String>() {
                    @Override
                    public String call(Tuple2<String, String> tuple2) {
                        return tuple2._2();
                    }
                });

                lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
                                     @Override
                                     public Void call(JavaRDD<String>
stringJavaRDD) throws Exception {

 JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts =
stringJavaRDD.mapToPair(
                                                 new PairFunction<String,
ImmutableBytesWritable, Put>() {
                                                     @Override
                                                     public
Tuple2<ImmutableBytesWritable, Put> call(String line) throws Exception {

                                                         Put put = new
Put(Bytes.toBytes("Rowkey" + Math.random()));

 put.addColumn(Bytes.toBytes("firstFamily"), Bytes.toBytes("firstColumn"),
Bytes.toBytes(line+"fc"));
                                                         return new
Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
                                                     }
                                                 });

                                         // save to HBase- Spark built-in
API method

 
hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration());
                                         return null;
                                     }
                                 }
                );
                jsc.start();
                jsc.awaitTermination();





I see below error on spark-shell.


./bin/spark-submit --class "SparkKafkaDemo" --master local
/Users/kvk/IntelliJWorkspace/HbaseDemo/HbaseDemo.jar

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable

at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)

at org.apache.spark.rdd.RDD.map(RDD.scala:286)

at
org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113)

at
org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46)

at SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)

at SparkKafkaDemo$2.call(SparkKafkaDemo.java:60)

at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)

at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

at scala.util.Try$.apply(Try.scala:161)

at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: Job in state DEFINE instead of
RUNNING

at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283)

at org.apache.hadoop.mapreduce.Job.toString(Job.java:452)

at java.lang.String.valueOf(String.java:2847)

at java.lang.StringBuilder.append(StringBuilder.java:128)

at scala.StringContext.standardInterpolator(StringContext.scala:122)

at scala.StringContext.s(StringContext.scala:90)

at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:103)

at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)

at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)

at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)

at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)

at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)

at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)

at
org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)

at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)

at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)

at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

... 24 more



Thanks


-kvk

Reply via email to