Can you show the complete stack trace ?

Subclass of Mutation is expected. Put is a subclass.

Have you tried replacing BoxedUnit with Put in your code ?

Cheers

On Fri, Oct 16, 2015 at 6:02 AM, Amit Singh Hora <hora.a...@gmail.com>
wrote:

> Hi All,
>
> I am using below code to stream data from kafka to hbase ,everything works
> fine until i restart the job so that it can restore the state from
> checkpoint directory ,but while trying to restore the state it give me
> below
> error
>
> ge 0.0 (TID 0, localhost): java.lang.ClassCastException:
> scala.runtime.BoxedUnit cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
>
> please find below code
>
> tweetsRDD.foreachRDD(rdd=>{
>       val hconf = HBaseConfiguration.create();
>     hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename)
>     hconf.set("zookeeper.session.timeout",
> conf.getString("hbase.zookeepertimeout"));
>     hconf.set("hbase.client.retries.number", Integer.toString(1));
>     hconf.set("zookeeper.recovery.retry", Integer.toString(1));
>     hconf.set("hbase.master", conf.getString("hbase.hbase_master"));
>
> hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum"));
>     hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
>     hconf.set("hbase.zookeeper.property.clientPort",
> conf.getString("hbase.hbase_zk_port"));
>      hconf.setClass("mapreduce.outputformat.class",
> classOf[TableOutputFormat[String]], classOf[OutputFormat[String,
> BoxedUnit]])
>
>          rdd.map ( record =>(new ImmutableBytesWritable,{
>
>
>             var maprecord = new HashMap[String, String];
>               val mapper = new ObjectMapper();
>
>               //convert JSON string to Map
>
>               maprecord = mapper.readValue(record.toString(),
>                 new TypeReference[HashMap[String, String]]() {});
>
>
>               var ts:Long= maprecord.get("ts").toLong
>               var tweetID:Long= maprecord.get("id").toLong
>               val key=ts+"_"+tweetID;
>               val   put=new Put(Bytes.toBytes(key))
>                maprecord.foreach(kv => {
> //              println(kv._1+" - "+kv._2)
>
>
> put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>
>
>               }
>                )
>        put
>
>         }
>          )
>          ).saveAsNewAPIHadoopDataset(hconf)
>
>   })
>
>
>
> help me out in solving this as it is urgent for me
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25089.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to