Re: HBase Spark Streaming giving error after restore
nt failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.hadoop.hbase.client.Mutation at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/10/16 18:50:03 INFO CacheManager: Partition rdd_20_1 not found, computing it On Fri, Oct 16, 2015 at 7:53 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Can you show the complete stack trace ? > > Subclass of Mutation is expected. Put is a subclass. > > Have you tried replacing BoxedUnit with Put in your code ? > > Cheers > > On Fri, Oct 16, 2015 at 6:02 AM, Amit Singh Hora <hora.a...@gmail.com> > wrote: > >> Hi All, >> >> I am using below code to stream data from kafka to hbase ,everything works >> fine until i restart the job so that it can restore the state from >> checkpoint directory ,but while trying to restore the state it give me >> below >> error >> >> ge 0.0 (TID 0, localhost): java.lang.ClassCastException: >> scala.runtime.BoxedUnit cannot be cast to >> org.apache.hadoop.hbase.client.Mutation >> >> please find below code >> >> tweetsRDD.foreachRDD(rdd=>{ >> val hconf = HBaseConfiguration.create(); >> hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename) >> hconf.set("zookeeper.session.timeout", >> conf.getString("hbase.zookeepertimeout")); >> hconf.set("hbase.client.retries.number", Integer.toString(1)); >> hconf.set("zookeeper.recovery.retry", Integer.toString(1)); >> hconf.set("hbase.master", conf.getString("hbase.hbase_master")); >> >> >> hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum")); >> hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); >> hconf.set("hbase.zookeeper.property.clientPort", >> conf.getString("hbase.hbase_zk_port")); >> hconf.setClass("mapreduce.outputformat.class", >> classOf[TableOutputFormat[String]], classOf[OutputFormat[String, >> BoxedUnit]]) >> >> rdd.map ( record =>(new ImmutableBytesWritable,{ >> >> >> var maprecord = new HashMap[String, String]; >> val mapper = new ObjectMapper(); >> >> //convert JSON string to Map >> >> maprecord = mapper.readValue(record.toString(), >> new TypeReference[HashMap[String, String]]() {}); >> >> >> var ts:Long= maprecord.get("ts").toLong >> var tweetID:Long= maprecord.get("id").toLong >> val key=ts+"_"+tweetID; >> val put=new Put(Bytes.toBytes(key)) >> maprecord.foreach(kv => { >> // println(kv._1+" - "
Re: HBase Spark Streaming giving error after restore
uot;); >>> hconf.set("hbase.zookeeper.property.clientPort", >>> conf.getString("hbase.hbase_zk_port")); >>> hconf.setClass("mapreduce.outputformat.class", >>> classOf[TableOutputFormat[String]], classOf[OutputFormat[String, >>> BoxedUnit]]) >>> >>> rdd.map ( record =>(new ImmutableBytesWritable,{ >>> >>> >>> var maprecord = new HashMap[String, String]; >>> val mapper = new ObjectMapper(); >>> >>> //convert JSON string to Map >>> >>> maprecord = mapper.readValue(record.toString(), >>> new TypeReference[HashMap[String, String]]() {}); >>> >>> >>> var ts:Long= maprecord.get("ts").toLong >>> var tweetID:Long= maprecord.get("id").toLong >>> val key=ts+"_"+tweetID; >>> val put=new Put(Bytes.toBytes(key)) >>>maprecord.foreach(kv => { >>> // println(kv._1+" - "+kv._2) >>> >>> >>> put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) >>> >>> >>> } >>>) >>>put >>> >>> } >>> ) >>> ).saveAsNewAPIHadoopDataset(hconf) >>> >>> }) >>> >>> >>> >>> help me out in solving this as it is urgent for me >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25089.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >
HBase Spark Streaming giving error after restore
Hi All, I am using below code to stream data from kafka to hbase ,everything works fine until i restart the job so that it can restore the state from checkpoint directory ,but while trying to restore the state it give me below error ge 0.0 (TID 0, localhost): java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.hadoop.hbase.client.Mutation please find below code tweetsRDD.foreachRDD(rdd=>{ val hconf = HBaseConfiguration.create(); hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename) hconf.set("zookeeper.session.timeout", conf.getString("hbase.zookeepertimeout")); hconf.set("hbase.client.retries.number", Integer.toString(1)); hconf.set("zookeeper.recovery.retry", Integer.toString(1)); hconf.set("hbase.master", conf.getString("hbase.hbase_master")); hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum")); hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); hconf.set("hbase.zookeeper.property.clientPort", conf.getString("hbase.hbase_zk_port")); hconf.setClass("mapreduce.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, BoxedUnit]]) rdd.map ( record =>(new ImmutableBytesWritable,{ var maprecord = new HashMap[String, String]; val mapper = new ObjectMapper(); //convert JSON string to Map maprecord = mapper.readValue(record.toString(), new TypeReference[HashMap[String, String]]() {}); var ts:Long= maprecord.get("ts").toLong var tweetID:Long= maprecord.get("id").toLong val key=ts+"_"+tweetID; val put=new Put(Bytes.toBytes(key)) maprecord.foreach(kv => { // println(kv._1+" - "+kv._2) put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) } ) put } ) ).saveAsNewAPIHadoopDataset(hconf) }) help me out in solving this as it is urgent for me -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25089.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HBase Spark Streaming giving error after restore
Hi All, I am using below code to stream data from kafka to hbase ,everything works fine until i restart the job so that it can restore the state from checkpoint directory ,but while trying to restore the state it give me below error ge 0.0 (TID 0, localhost): java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.hadoop.hbase.client.Mutation please find below code tweetsRDD.foreachRDD(rdd=>{ val hconf = HBaseConfiguration.create(); hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename) hconf.set("zookeeper.session.timeout", conf.getString("hbase.zookeepertimeout")); hconf.set("hbase.client.retries.number", Integer.toString(1)); hconf.set("zookeeper.recovery.retry", Integer.toString(1)); hconf.set("hbase.master", conf.getString("hbase.hbase_master")); hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum")); hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); hconf.set("hbase.zookeeper.property.clientPort", conf.getString("hbase.hbase_zk_port")); hconf.setClass("mapreduce.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, BoxedUnit]]) //i have also tried using // hconf.setClass("mapreduce.outputformat.class", classOf[TableOutputFormat[String]], //classOf[OutputFormat[String, Mutation]]) rdd.map ( record =>(new ImmutableBytesWritable,{ var maprecord = new HashMap[String, String]; val mapper = new ObjectMapper(); //convert JSON string to Map maprecord = mapper.readValue(record.toString(), new TypeReference[HashMap[String, String]]() {}); var ts:Long= maprecord.get("ts").toLong var tweetID:Long= maprecord.get("id").toLong val key=ts+"_"+tweetID; val put=new Put(Bytes.toBytes(key)) maprecord.foreach(kv => { // println(kv._1+" - "+kv._2) put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) } ) put } ) ).saveAsNewAPIHadoopDataset(hconf) }) help me out in solving this as it is urgent for me -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25090.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HBase Spark Streaming giving error after restore
Can you show the complete stack trace ? Subclass of Mutation is expected. Put is a subclass. Have you tried replacing BoxedUnit with Put in your code ? Cheers On Fri, Oct 16, 2015 at 6:02 AM, Amit Singh Hora <hora.a...@gmail.com> wrote: > Hi All, > > I am using below code to stream data from kafka to hbase ,everything works > fine until i restart the job so that it can restore the state from > checkpoint directory ,but while trying to restore the state it give me > below > error > > ge 0.0 (TID 0, localhost): java.lang.ClassCastException: > scala.runtime.BoxedUnit cannot be cast to > org.apache.hadoop.hbase.client.Mutation > > please find below code > > tweetsRDD.foreachRDD(rdd=>{ > val hconf = HBaseConfiguration.create(); > hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename) > hconf.set("zookeeper.session.timeout", > conf.getString("hbase.zookeepertimeout")); > hconf.set("hbase.client.retries.number", Integer.toString(1)); > hconf.set("zookeeper.recovery.retry", Integer.toString(1)); > hconf.set("hbase.master", conf.getString("hbase.hbase_master")); > > hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum")); > hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); > hconf.set("hbase.zookeeper.property.clientPort", > conf.getString("hbase.hbase_zk_port")); > hconf.setClass("mapreduce.outputformat.class", > classOf[TableOutputFormat[String]], classOf[OutputFormat[String, > BoxedUnit]]) > > rdd.map ( record =>(new ImmutableBytesWritable,{ > > > var maprecord = new HashMap[String, String]; > val mapper = new ObjectMapper(); > > //convert JSON string to Map > > maprecord = mapper.readValue(record.toString(), > new TypeReference[HashMap[String, String]]() {}); > > > var ts:Long= maprecord.get("ts").toLong > var tweetID:Long= maprecord.get("id").toLong > val key=ts+"_"+tweetID; > val put=new Put(Bytes.toBytes(key)) >maprecord.foreach(kv => { > // println(kv._1+" - "+kv._2) > > > put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) > > > } >) >put > > } > ) > ).saveAsNewAPIHadoopDataset(hconf) > > }) > > > > help me out in solving this as it is urgent for me > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25089.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >