Re: SPARKONHBase checkpointing issue

2015-10-28 Thread Tathagata Das
Yes, the workaround is the same that has been suggested in the JIRA for
accumulator and broadcast variables. Basically make a singleton object
which lazily initializes the HBaseContext. Because of singleton, it wont
get serialized through checkpoint. After recovering, it will be
reinitialized lazily. This is the exact same approach I did for `
SQLContext.getOrCreate()
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L1259>`.
Take a look at the code.

On Tue, Oct 27, 2015 at 11:19 PM, Amit Hora  wrote:

> Thanks for sharing the link.Yes I understand that accumulators and
> broadcast variables state are not recovered from checkpoint but is there
> any way by which I can say that the HBaseContext in this context should nt
> be recovered from checkpoint rather must be reinitialized
> --
> From: Adrian Tanase 
> Sent: ‎27-‎10-‎2015 18:08
> To: Amit Singh Hora ; user@spark.apache.org
> Subject: Re: SPARKONHBase checkpointing issue
>
> Does this help?
>
> https://issues.apache.org/jira/browse/SPARK-5206
>
>
>
> On 10/27/15, 1:53 PM, "Amit Singh Hora"  wrote:
>
> >Hi all ,
> >
> >I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
> >below code
> >object test {
> >
> >def main(args: Array[String]): Unit = {
> >
> >
> >
> >   val conf =
> ConfigFactory.load("connection.conf").getConfig("connection")
> >val checkpointDirectory=conf.getString("spark.checkpointDir")
> >val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
> >  functionToCreateContext(checkpointDirectory)
> >})
> >
> >
> >ssc.start()
> >ssc.awaitTermination()
> >
> > }
> >
> >def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
> >  println("always gets created")
> >   val hconf = HBaseConfiguration.create();
> >val timeout= conf.getString("hbase.zookeepertimeout")
> >val master=conf.getString("hbase.hbase_master")
> >val zk=conf.getString("hbase.hbase_zkquorum")
> >val zkport=conf.getString("hbase.hbase_zk_port")
> >
> >  hconf.set("zookeeper.session.timeout",timeout);
> >hconf.set("hbase.client.retries.number", Integer.toString(1));
> >hconf.set("zookeeper.recovery.retry", Integer.toString(1));
> >hconf.set("hbase.master", master);
> >hconf.set("hbase.zookeeper.quorum",zk);
> >hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
> >hconf.set("hbase.zookeeper.property.clientPort",zkport );
> >
> >
> >val hbaseContext = new HBaseContext(sc, hconf);
> >return hbaseContext
> >}
> >  def functionToCreateContext(checkpointDirectory: String):
> StreamingContext
> >= {
> >println("creating for frst time")
> >val conf =
> ConfigFactory.load("connection.conf").getConfig("connection")
> >val brokerlist = conf.getString("kafka.broker")
> >val topic = conf.getString("kafka.topic")
> >
> >val Array(brokers, topics) = Array(brokerlist, topic)
> >
> >
> >val sparkConf = new
> SparkConf().setAppName("HBaseBulkPutTimestampExample
> >" )
> >sparkConf.set("spark.cleaner.ttl", "2");
> >sparkConf.setMaster("local[2]")
> >
> >
> > val topicsSet = topic.split(",").toSet
> >val batchduration = conf.getString("spark.batchduration").toInt
> >val ssc: StreamingContext = new StreamingContext(sparkConf,
> >Seconds(batchduration))
> >  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
> > val kafkaParams = Map[String, String]("metadata.broker.list" ->
> >brokerlist, "auto.offset.reset" -> "smallest")
> >val messages = KafkaUtils.createDirectStream[String, String,
> >StringDecoder, StringDecoder](
> >  ssc, kafkaParams, topicsSet)
> >val lines=messages.map(_._2)
> >
> >
> >
> >getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
> >  "ecs_test",
> >  (putRecord) => {
> >if (putRecord.length() > 0) {
> >  var maprecord = new HashMap[String, String];
> >  val mapper = new ObjectMapper();
> >
> >  //conv

RE: SPARKONHBase checkpointing issue

2015-10-27 Thread Amit Hora
Thanks for sharing the link.Yes I understand that accumulators and broadcast 
variables state are not recovered from checkpoint but is there any way by which 
I can say that the HBaseContext in this context should nt be recovered from 
checkpoint rather must be reinitialized 

-Original Message-
From: "Adrian Tanase" 
Sent: ‎27-‎10-‎2015 18:08
To: "Amit Singh Hora" ; "user@spark.apache.org" 

Subject: Re: SPARKONHBase checkpointing issue

Does this help?

https://issues.apache.org/jira/browse/SPARK-5206



On 10/27/15, 1:53 PM, "Amit Singh Hora"  wrote:

>Hi all ,
>
>I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
>below code
>object test {
>  
>def main(args: Array[String]): Unit = {
>
>
>
>   val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>val checkpointDirectory=conf.getString("spark.checkpointDir")
>val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
>  functionToCreateContext(checkpointDirectory)
>})
> 
>
>ssc.start()
>ssc.awaitTermination()
>
> }
>
>def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
>  println("always gets created")
>   val hconf = HBaseConfiguration.create();
>val timeout= conf.getString("hbase.zookeepertimeout")
>val master=conf.getString("hbase.hbase_master")
>val zk=conf.getString("hbase.hbase_zkquorum")
>val zkport=conf.getString("hbase.hbase_zk_port")
>
>  hconf.set("zookeeper.session.timeout",timeout);
>hconf.set("hbase.client.retries.number", Integer.toString(1));
>hconf.set("zookeeper.recovery.retry", Integer.toString(1));
>hconf.set("hbase.master", master);
>hconf.set("hbase.zookeeper.quorum",zk);
>hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
>hconf.set("hbase.zookeeper.property.clientPort",zkport );
>
>   
>val hbaseContext = new HBaseContext(sc, hconf);
>return hbaseContext
>}
>  def functionToCreateContext(checkpointDirectory: String): StreamingContext
>= {
>println("creating for frst time")
>val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>val brokerlist = conf.getString("kafka.broker")
>val topic = conf.getString("kafka.topic")
>
>val Array(brokers, topics) = Array(brokerlist, topic)
>
>
>val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
>" )
>sparkConf.set("spark.cleaner.ttl", "2");
>sparkConf.setMaster("local[2]")
>
>
> val topicsSet = topic.split(",").toSet
>val batchduration = conf.getString("spark.batchduration").toInt
>val ssc: StreamingContext = new StreamingContext(sparkConf,
>Seconds(batchduration))
>  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokerlist, "auto.offset.reset" -> "smallest")
>val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
>  ssc, kafkaParams, topicsSet)
>val lines=messages.map(_._2)
>   
>
>
>getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
>  "ecs_test",
>  (putRecord) => {
>if (putRecord.length() > 0) {
>  var maprecord = new HashMap[String, String];
>  val mapper = new ObjectMapper();
>
>  //convert JSON string to Map
>  maprecord = mapper.readValue(putRecord,
>new TypeReference[HashMap[String, String]]() {});
>  
>  var ts: Long = maprecord.get("ts").toLong
>  
>   var tweetID:Long= maprecord.get("id").toLong
>  val key=ts+"_"+tweetID;
>  
>  val put = new Put(Bytes.toBytes(key))
>  maprecord.foreach(kv => {
> 
> 
>put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>  
>
>  })
>
>
>  put
>} else {
>  null
>}
>  },
>  false);
>
>ssc
>  
>  }
>}
>
>i am not able to retrieve from checkpoint after restart ,always get 
>Unable to getConfig from broadcast
>
>after debugging more i can see that the method for creating the HbaseContext
>actually broadcasts the configuration ,context object passed
>
>as a solution i just want to recreate the hbase context in every condition
>weather the checkpoint exists or not
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: SPARKONHBase checkpointing issue

2015-10-27 Thread Adrian Tanase
Does this help?

https://issues.apache.org/jira/browse/SPARK-5206



On 10/27/15, 1:53 PM, "Amit Singh Hora"  wrote:

>Hi all ,
>
>I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
>below code
>object test {
>  
>def main(args: Array[String]): Unit = {
>
>
>
>   val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>val checkpointDirectory=conf.getString("spark.checkpointDir")
>val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
>  functionToCreateContext(checkpointDirectory)
>})
> 
>
>ssc.start()
>ssc.awaitTermination()
>
> }
>
>def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
>  println("always gets created")
>   val hconf = HBaseConfiguration.create();
>val timeout= conf.getString("hbase.zookeepertimeout")
>val master=conf.getString("hbase.hbase_master")
>val zk=conf.getString("hbase.hbase_zkquorum")
>val zkport=conf.getString("hbase.hbase_zk_port")
>
>  hconf.set("zookeeper.session.timeout",timeout);
>hconf.set("hbase.client.retries.number", Integer.toString(1));
>hconf.set("zookeeper.recovery.retry", Integer.toString(1));
>hconf.set("hbase.master", master);
>hconf.set("hbase.zookeeper.quorum",zk);
>hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
>hconf.set("hbase.zookeeper.property.clientPort",zkport );
>
>   
>val hbaseContext = new HBaseContext(sc, hconf);
>return hbaseContext
>}
>  def functionToCreateContext(checkpointDirectory: String): StreamingContext
>= {
>println("creating for frst time")
>val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>val brokerlist = conf.getString("kafka.broker")
>val topic = conf.getString("kafka.topic")
>
>val Array(brokers, topics) = Array(brokerlist, topic)
>
>
>val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
>" )
>sparkConf.set("spark.cleaner.ttl", "2");
>sparkConf.setMaster("local[2]")
>
>
> val topicsSet = topic.split(",").toSet
>val batchduration = conf.getString("spark.batchduration").toInt
>val ssc: StreamingContext = new StreamingContext(sparkConf,
>Seconds(batchduration))
>  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokerlist, "auto.offset.reset" -> "smallest")
>val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
>  ssc, kafkaParams, topicsSet)
>val lines=messages.map(_._2)
>   
>
>
>getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
>  "ecs_test",
>  (putRecord) => {
>if (putRecord.length() > 0) {
>  var maprecord = new HashMap[String, String];
>  val mapper = new ObjectMapper();
>
>  //convert JSON string to Map
>  maprecord = mapper.readValue(putRecord,
>new TypeReference[HashMap[String, String]]() {});
>  
>  var ts: Long = maprecord.get("ts").toLong
>  
>   var tweetID:Long= maprecord.get("id").toLong
>  val key=ts+"_"+tweetID;
>  
>  val put = new Put(Bytes.toBytes(key))
>  maprecord.foreach(kv => {
> 
> 
>put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>  
>
>  })
>
>
>  put
>} else {
>  null
>}
>  },
>  false);
>
>ssc
>  
>  }
>}
>
>i am not able to retrieve from checkpoint after restart ,always get 
>Unable to getConfig from broadcast
>
>after debugging more i can see that the method for creating the HbaseContext
>actually broadcasts the configuration ,context object passed
>
>as a solution i just want to recreate the hbase context in every condition
>weather the checkpoint exists or not
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SPARKONHBase checkpointing issue

2015-10-27 Thread Amit Singh Hora
Hi all ,

I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
below code
object test {
  
def main(args: Array[String]): Unit = {



   val conf = ConfigFactory.load("connection.conf").getConfig("connection")
val checkpointDirectory=conf.getString("spark.checkpointDir")
val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
  functionToCreateContext(checkpointDirectory)
})
 

ssc.start()
ssc.awaitTermination()

 }

def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
  println("always gets created")
   val hconf = HBaseConfiguration.create();
val timeout= conf.getString("hbase.zookeepertimeout")
val master=conf.getString("hbase.hbase_master")
val zk=conf.getString("hbase.hbase_zkquorum")
val zkport=conf.getString("hbase.hbase_zk_port")

  hconf.set("zookeeper.session.timeout",timeout);
hconf.set("hbase.client.retries.number", Integer.toString(1));
hconf.set("zookeeper.recovery.retry", Integer.toString(1));
hconf.set("hbase.master", master);
hconf.set("hbase.zookeeper.quorum",zk);
hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
hconf.set("hbase.zookeeper.property.clientPort",zkport );

   
val hbaseContext = new HBaseContext(sc, hconf);
return hbaseContext
}
  def functionToCreateContext(checkpointDirectory: String): StreamingContext
= {
println("creating for frst time")
val conf = ConfigFactory.load("connection.conf").getConfig("connection")
val brokerlist = conf.getString("kafka.broker")
val topic = conf.getString("kafka.topic")

val Array(brokers, topics) = Array(brokerlist, topic)


val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
" )
sparkConf.set("spark.cleaner.ttl", "2");
sparkConf.setMaster("local[2]")


 val topicsSet = topic.split(",").toSet
val batchduration = conf.getString("spark.batchduration").toInt
val ssc: StreamingContext = new StreamingContext(sparkConf,
Seconds(batchduration))
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
 val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokerlist, "auto.offset.reset" -> "smallest")
val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)
val lines=messages.map(_._2)
   


getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
  "ecs_test",
  (putRecord) => {
if (putRecord.length() > 0) {
  var maprecord = new HashMap[String, String];
  val mapper = new ObjectMapper();

  //convert JSON string to Map
  maprecord = mapper.readValue(putRecord,
new TypeReference[HashMap[String, String]]() {});
  
  var ts: Long = maprecord.get("ts").toLong
  
   var tweetID:Long= maprecord.get("id").toLong
  val key=ts+"_"+tweetID;
  
  val put = new Put(Bytes.toBytes(key))
  maprecord.foreach(kv => {
 
 
put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
  

  })


  put
} else {
  null
}
  },
  false);

ssc
  
  }
}

i am not able to retrieve from checkpoint after restart ,always get 
Unable to getConfig from broadcast

after debugging more i can see that the method for creating the HbaseContext
actually broadcasts the configuration ,context object passed

as a solution i just want to recreate the hbase context in every condition
weather the checkpoint exists or not



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org