Re: SPARKONHBase checkpointing issue
Yes, the workaround is the same that has been suggested in the JIRA for accumulator and broadcast variables. Basically make a singleton object which lazily initializes the HBaseContext. Because of singleton, it wont get serialized through checkpoint. After recovering, it will be reinitialized lazily. This is the exact same approach I did for ` SQLContext.getOrCreate() <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L1259>`. Take a look at the code. On Tue, Oct 27, 2015 at 11:19 PM, Amit Hora wrote: > Thanks for sharing the link.Yes I understand that accumulators and > broadcast variables state are not recovered from checkpoint but is there > any way by which I can say that the HBaseContext in this context should nt > be recovered from checkpoint rather must be reinitialized > -- > From: Adrian Tanase > Sent: 27-10-2015 18:08 > To: Amit Singh Hora ; user@spark.apache.org > Subject: Re: SPARKONHBase checkpointing issue > > Does this help? > > https://issues.apache.org/jira/browse/SPARK-5206 > > > > On 10/27/15, 1:53 PM, "Amit Singh Hora" wrote: > > >Hi all , > > > >I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find > >below code > >object test { > > > >def main(args: Array[String]): Unit = { > > > > > > > > val conf = > ConfigFactory.load("connection.conf").getConfig("connection") > >val checkpointDirectory=conf.getString("spark.checkpointDir") > >val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{ > > functionToCreateContext(checkpointDirectory) > >}) > > > > > >ssc.start() > >ssc.awaitTermination() > > > > } > > > >def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={ > > println("always gets created") > > val hconf = HBaseConfiguration.create(); > >val timeout= conf.getString("hbase.zookeepertimeout") > >val master=conf.getString("hbase.hbase_master") > >val zk=conf.getString("hbase.hbase_zkquorum") > >val zkport=conf.getString("hbase.hbase_zk_port") > > > > hconf.set("zookeeper.session.timeout",timeout); > >hconf.set("hbase.client.retries.number", Integer.toString(1)); > >hconf.set("zookeeper.recovery.retry", Integer.toString(1)); > >hconf.set("hbase.master", master); > >hconf.set("hbase.zookeeper.quorum",zk); > >hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); > >hconf.set("hbase.zookeeper.property.clientPort",zkport ); > > > > > >val hbaseContext = new HBaseContext(sc, hconf); > >return hbaseContext > >} > > def functionToCreateContext(checkpointDirectory: String): > StreamingContext > >= { > >println("creating for frst time") > >val conf = > ConfigFactory.load("connection.conf").getConfig("connection") > >val brokerlist = conf.getString("kafka.broker") > >val topic = conf.getString("kafka.topic") > > > >val Array(brokers, topics) = Array(brokerlist, topic) > > > > > >val sparkConf = new > SparkConf().setAppName("HBaseBulkPutTimestampExample > >" ) > >sparkConf.set("spark.cleaner.ttl", "2"); > >sparkConf.setMaster("local[2]") > > > > > > val topicsSet = topic.split(",").toSet > >val batchduration = conf.getString("spark.batchduration").toInt > >val ssc: StreamingContext = new StreamingContext(sparkConf, > >Seconds(batchduration)) > > ssc.checkpoint(checkpointDirectory) // set checkpoint directory > > val kafkaParams = Map[String, String]("metadata.broker.list" -> > >brokerlist, "auto.offset.reset" -> "smallest") > >val messages = KafkaUtils.createDirectStream[String, String, > >StringDecoder, StringDecoder]( > > ssc, kafkaParams, topicsSet) > >val lines=messages.map(_._2) > > > > > > > >getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines, > > "ecs_test", > > (putRecord) => { > >if (putRecord.length() > 0) { > > var maprecord = new HashMap[String, String]; > > val mapper = new ObjectMapper(); > > > > //conv
RE: SPARKONHBase checkpointing issue
Thanks for sharing the link.Yes I understand that accumulators and broadcast variables state are not recovered from checkpoint but is there any way by which I can say that the HBaseContext in this context should nt be recovered from checkpoint rather must be reinitialized -Original Message- From: "Adrian Tanase" Sent: 27-10-2015 18:08 To: "Amit Singh Hora" ; "user@spark.apache.org" Subject: Re: SPARKONHBase checkpointing issue Does this help? https://issues.apache.org/jira/browse/SPARK-5206 On 10/27/15, 1:53 PM, "Amit Singh Hora" wrote: >Hi all , > >I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find >below code >object test { > >def main(args: Array[String]): Unit = { > > > > val conf = ConfigFactory.load("connection.conf").getConfig("connection") >val checkpointDirectory=conf.getString("spark.checkpointDir") >val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{ > functionToCreateContext(checkpointDirectory) >}) > > >ssc.start() >ssc.awaitTermination() > > } > >def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={ > println("always gets created") > val hconf = HBaseConfiguration.create(); >val timeout= conf.getString("hbase.zookeepertimeout") >val master=conf.getString("hbase.hbase_master") >val zk=conf.getString("hbase.hbase_zkquorum") >val zkport=conf.getString("hbase.hbase_zk_port") > > hconf.set("zookeeper.session.timeout",timeout); >hconf.set("hbase.client.retries.number", Integer.toString(1)); >hconf.set("zookeeper.recovery.retry", Integer.toString(1)); >hconf.set("hbase.master", master); >hconf.set("hbase.zookeeper.quorum",zk); >hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); >hconf.set("hbase.zookeeper.property.clientPort",zkport ); > > >val hbaseContext = new HBaseContext(sc, hconf); >return hbaseContext >} > def functionToCreateContext(checkpointDirectory: String): StreamingContext >= { >println("creating for frst time") >val conf = ConfigFactory.load("connection.conf").getConfig("connection") >val brokerlist = conf.getString("kafka.broker") >val topic = conf.getString("kafka.topic") > >val Array(brokers, topics) = Array(brokerlist, topic) > > >val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample >" ) >sparkConf.set("spark.cleaner.ttl", "2"); >sparkConf.setMaster("local[2]") > > > val topicsSet = topic.split(",").toSet >val batchduration = conf.getString("spark.batchduration").toInt >val ssc: StreamingContext = new StreamingContext(sparkConf, >Seconds(batchduration)) > ssc.checkpoint(checkpointDirectory) // set checkpoint directory > val kafkaParams = Map[String, String]("metadata.broker.list" -> >brokerlist, "auto.offset.reset" -> "smallest") >val messages = KafkaUtils.createDirectStream[String, String, >StringDecoder, StringDecoder]( > ssc, kafkaParams, topicsSet) >val lines=messages.map(_._2) > > > >getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines, > "ecs_test", > (putRecord) => { >if (putRecord.length() > 0) { > var maprecord = new HashMap[String, String]; > val mapper = new ObjectMapper(); > > //convert JSON string to Map > maprecord = mapper.readValue(putRecord, >new TypeReference[HashMap[String, String]]() {}); > > var ts: Long = maprecord.get("ts").toLong > > var tweetID:Long= maprecord.get("id").toLong > val key=ts+"_"+tweetID; > > val put = new Put(Bytes.toBytes(key)) > maprecord.foreach(kv => { > > >put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) > > > }) > > > put >} else { > null >} > }, > false); > >ssc > > } >} > >i am not able to retrieve from checkpoint after restart ,always get >Unable to getConfig from broadcast > >after debugging more i can see that the method for creating the HbaseContext >actually broadcasts the configuration ,context object passed > >as a solution i just want to recreate the hbase context in every condition >weather the checkpoint exists or not > > > >-- >View this message in context: >http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html >Sent from the Apache Spark User List mailing list archive at Nabble.com. > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org >
Re: SPARKONHBase checkpointing issue
Does this help? https://issues.apache.org/jira/browse/SPARK-5206 On 10/27/15, 1:53 PM, "Amit Singh Hora" wrote: >Hi all , > >I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find >below code >object test { > >def main(args: Array[String]): Unit = { > > > > val conf = ConfigFactory.load("connection.conf").getConfig("connection") >val checkpointDirectory=conf.getString("spark.checkpointDir") >val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{ > functionToCreateContext(checkpointDirectory) >}) > > >ssc.start() >ssc.awaitTermination() > > } > >def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={ > println("always gets created") > val hconf = HBaseConfiguration.create(); >val timeout= conf.getString("hbase.zookeepertimeout") >val master=conf.getString("hbase.hbase_master") >val zk=conf.getString("hbase.hbase_zkquorum") >val zkport=conf.getString("hbase.hbase_zk_port") > > hconf.set("zookeeper.session.timeout",timeout); >hconf.set("hbase.client.retries.number", Integer.toString(1)); >hconf.set("zookeeper.recovery.retry", Integer.toString(1)); >hconf.set("hbase.master", master); >hconf.set("hbase.zookeeper.quorum",zk); >hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); >hconf.set("hbase.zookeeper.property.clientPort",zkport ); > > >val hbaseContext = new HBaseContext(sc, hconf); >return hbaseContext >} > def functionToCreateContext(checkpointDirectory: String): StreamingContext >= { >println("creating for frst time") >val conf = ConfigFactory.load("connection.conf").getConfig("connection") >val brokerlist = conf.getString("kafka.broker") >val topic = conf.getString("kafka.topic") > >val Array(brokers, topics) = Array(brokerlist, topic) > > >val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample >" ) >sparkConf.set("spark.cleaner.ttl", "2"); >sparkConf.setMaster("local[2]") > > > val topicsSet = topic.split(",").toSet >val batchduration = conf.getString("spark.batchduration").toInt >val ssc: StreamingContext = new StreamingContext(sparkConf, >Seconds(batchduration)) > ssc.checkpoint(checkpointDirectory) // set checkpoint directory > val kafkaParams = Map[String, String]("metadata.broker.list" -> >brokerlist, "auto.offset.reset" -> "smallest") >val messages = KafkaUtils.createDirectStream[String, String, >StringDecoder, StringDecoder]( > ssc, kafkaParams, topicsSet) >val lines=messages.map(_._2) > > > >getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines, > "ecs_test", > (putRecord) => { >if (putRecord.length() > 0) { > var maprecord = new HashMap[String, String]; > val mapper = new ObjectMapper(); > > //convert JSON string to Map > maprecord = mapper.readValue(putRecord, >new TypeReference[HashMap[String, String]]() {}); > > var ts: Long = maprecord.get("ts").toLong > > var tweetID:Long= maprecord.get("id").toLong > val key=ts+"_"+tweetID; > > val put = new Put(Bytes.toBytes(key)) > maprecord.foreach(kv => { > > >put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) > > > }) > > > put >} else { > null >} > }, > false); > >ssc > > } >} > >i am not able to retrieve from checkpoint after restart ,always get >Unable to getConfig from broadcast > >after debugging more i can see that the method for creating the HbaseContext >actually broadcasts the configuration ,context object passed > >as a solution i just want to recreate the hbase context in every condition >weather the checkpoint exists or not > > > >-- >View this message in context: >http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html >Sent from the Apache Spark User List mailing list archive at Nabble.com. > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARKONHBase checkpointing issue
Hi all , I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find below code object test { def main(args: Array[String]): Unit = { val conf = ConfigFactory.load("connection.conf").getConfig("connection") val checkpointDirectory=conf.getString("spark.checkpointDir") val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{ functionToCreateContext(checkpointDirectory) }) ssc.start() ssc.awaitTermination() } def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={ println("always gets created") val hconf = HBaseConfiguration.create(); val timeout= conf.getString("hbase.zookeepertimeout") val master=conf.getString("hbase.hbase_master") val zk=conf.getString("hbase.hbase_zkquorum") val zkport=conf.getString("hbase.hbase_zk_port") hconf.set("zookeeper.session.timeout",timeout); hconf.set("hbase.client.retries.number", Integer.toString(1)); hconf.set("zookeeper.recovery.retry", Integer.toString(1)); hconf.set("hbase.master", master); hconf.set("hbase.zookeeper.quorum",zk); hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); hconf.set("hbase.zookeeper.property.clientPort",zkport ); val hbaseContext = new HBaseContext(sc, hconf); return hbaseContext } def functionToCreateContext(checkpointDirectory: String): StreamingContext = { println("creating for frst time") val conf = ConfigFactory.load("connection.conf").getConfig("connection") val brokerlist = conf.getString("kafka.broker") val topic = conf.getString("kafka.topic") val Array(brokers, topics) = Array(brokerlist, topic) val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " ) sparkConf.set("spark.cleaner.ttl", "2"); sparkConf.setMaster("local[2]") val topicsSet = topic.split(",").toSet val batchduration = conf.getString("spark.batchduration").toInt val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(batchduration)) ssc.checkpoint(checkpointDirectory) // set checkpoint directory val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerlist, "auto.offset.reset" -> "smallest") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) val lines=messages.map(_._2) getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines, "ecs_test", (putRecord) => { if (putRecord.length() > 0) { var maprecord = new HashMap[String, String]; val mapper = new ObjectMapper(); //convert JSON string to Map maprecord = mapper.readValue(putRecord, new TypeReference[HashMap[String, String]]() {}); var ts: Long = maprecord.get("ts").toLong var tweetID:Long= maprecord.get("id").toLong val key=ts+"_"+tweetID; val put = new Put(Bytes.toBytes(key)) maprecord.foreach(kv => { put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) }) put } else { null } }, false); ssc } } i am not able to retrieve from checkpoint after restart ,always get Unable to getConfig from broadcast after debugging more i can see that the method for creating the HbaseContext actually broadcasts the configuration ,context object passed as a solution i just want to recreate the hbase context in every condition weather the checkpoint exists or not -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org