Honestly, I would stay far away from saving offsets in Zookeeper if at all possible. It's better to store them alongside your results.
On Wed, Oct 26, 2016 at 10:44 AM, Sunita Arvind <sunitarv...@gmail.com> wrote: > This is enough to get it to work: > > df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet", > SaveMode.Overwrite) > > And tests so far (in local env) seem good with the edits. Yet to test on the > cluster. Cody, appreciate your thoughts on the edits. > > Just want to make sure I am not doing an overkill or overseeing a potential > issue. > > regards > > Sunita > > > On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind <sunitarv...@gmail.com> > wrote: >> >> The error in the file I just shared is here: >> >> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + >> partition._2(0); --> this was just partition and hence there was an error >> >> fetching the offset. >> >> Still testing. Somehow Cody, your code never lead to file already exists >> sort of errors (I am saving the output of the dstream >> as parquet file, after converting it to a dataframe. The batch interval >> will be 2 hrs) >> >> The code in the main is here: >> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), >> conf.getString("groupId"), conf.getString("topics")) >> val storedOffsets = offsetsStore.readOffsets() >> LogHandler.log.info("Fetched the offset from zookeeper") >> >> val kafkaArr = storedOffsets match { >> case None => >> // start from the initial offsets >> >> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, >> kafkaProps, Set(topics)) >> >> case Some(fromOffsets) => >> // start from previously saved offsets >> val messageHandler: MessageAndMetadata[String, Array[Byte]] => >> (String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) => >> (mmd.key, mmd.message) >> >> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String, >> Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler) >> >> //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage, >> (String, Row)](sc, kafkaProps, fromOffsets, messageHandler) >> } >> >> kafkaArr.foreachRDD{ (rdd,time) => >> >> val schema = >> SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType] >> val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r => >> Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray)) >> } >> val df = sql.createDataFrame(ardd,schema) >> LogHandler.log.info("Created dataframe") >> val offsetSaved = >> offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_") >> LogHandler.log.info("Saved offset to Zookeeper") >> df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved) >> LogHandler.log.info("Created the parquet file") >> } >> >> Thanks >> >> Sunita >> >> >> >> >> >> On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind <sunitarv...@gmail.com> >> wrote: >>> >>> Attached is the edited code. Am I heading in right direction? Also, I am >>> missing something due to which, it seems to work well as long as the >>> application is running and the files are created right. But as soon as I >>> restart the application, it goes back to fromOffset as 0. Any thoughts? >>> >>> regards >>> Sunita >>> >>> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com> >>> wrote: >>>> >>>> Thanks for confirming Cody. >>>> To get to use the library, I had to do: >>>> >>>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), >>>> "/consumers/topics/"+ topics + "/0") >>>> >>>> It worked well. However, I had to specify the partitionId in the zkPath. >>>> If I want the library to pick all the partitions for a topic, without me >>>> specifying the path, is it possible out of the box or I need to tweak? >>>> >>>> regards >>>> Sunita >>>> >>>> >>>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>>> >>>>> You are correct that you shouldn't have to worry about broker id. >>>>> >>>>> I'm honestly not sure specifically what else you are asking at this >>>>> point. >>>>> >>>>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com> >>>>> wrote: >>>>> > Just re-read the kafka architecture. Something that slipped my mind >>>>> > is, it >>>>> > is leader based. So topic/partitionId pair will be same on all the >>>>> > brokers. >>>>> > So we do not need to consider brokerid while storing offsets. Still >>>>> > exploring rest of the items. >>>>> > regards >>>>> > Sunita >>>>> > >>>>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind >>>>> > <sunitarv...@gmail.com> >>>>> > wrote: >>>>> >> >>>>> >> Hello Experts, >>>>> >> >>>>> >> I am trying to use the saving to ZK design. Just saw Sudhir's >>>>> >> comments >>>>> >> that it is old approach. Any reasons for that? Any issues observed >>>>> >> with >>>>> >> saving to ZK. The way we are planning to use it is: >>>>> >> 1. Following >>>>> >> >>>>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html >>>>> >> 2. Saving to the same file with offsetRange as a part of the file. >>>>> >> We hope >>>>> >> that there are no partial writes/ overwriting is possible and >>>>> >> offsetRanges >>>>> >> >>>>> >> However I have below doubts which I couldnt figure out from the code >>>>> >> here >>>>> >> - >>>>> >> >>>>> >> https://github.com/ippontech/spark-kafka-source/blob/master/src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala >>>>> >> 1. The brokerId is not part of the OffsetRange. How will just the >>>>> >> partitionId:FromOffset stay unique in a cluster with multiple >>>>> >> brokers and >>>>> >> multiple partitions/topic. >>>>> >> 2. Do we have to specify zkPath to include the partitionid. I tried >>>>> >> using >>>>> >> the ZookeeperOffsetStore as is and it required me to specify the >>>>> >> partitionId: >>>>> >> >>>>> >> val offsetsStore = new >>>>> >> ZooKeeperOffsetsStore(conf.getString("zkHosts"), >>>>> >> "/consumers/topics/"+ topics + "/0") >>>>> >> >>>>> >> For our usecases it is too limiting to include partitionId in the >>>>> >> path. >>>>> >> To get it to work by automatically detecting the existing partitions >>>>> >> for a >>>>> >> given topic, I changed it as below (inspired from >>>>> >> >>>>> >> http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs): >>>>> >> >>>>> >> /** >>>>> >> * zkServers Zookeeper server string: host1:port1[,host2:port2,...] >>>>> >> * groupID consumer group to get offsets for >>>>> >> * topic topic to get offsets for >>>>> >> * return - mapping of (topic and) partition to offset >>>>> >> */ >>>>> >> private def getOffsets(groupID :String, topic: >>>>> >> String):Option[String] = { >>>>> >> val topicDirs = new ZKGroupTopicDirs(groupID, topic) >>>>> >> val offsets = new mutable.HashMap[TopicAndPartition,Long]() >>>>> >> val topicSeq = List(topic).toSeq >>>>> >> // try { >>>>> >> val partitions = ZkUtils.getPartitionsForTopics(zkClient, >>>>> >> topicSeq) >>>>> >> var partition:Object=null >>>>> >> for (partition <- partitions) { >>>>> >> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + >>>>> >> "/" + >>>>> >> partition; >>>>> >> val maybeOffset:Option[String] = >>>>> >> ZkUtils.readDataMaybeNull(zkClient, >>>>> >> partitionOffsetPath)._1; >>>>> >> val offset:Long = if(maybeOffset.isDefined) >>>>> >> maybeOffset.get.toLong >>>>> >> else 0L; >>>>> >> val topicAndPartition:TopicAndPartition = new >>>>> >> TopicAndPartition(topic, Integer.parseInt(partition.toString)); >>>>> >> offsets.put(topicAndPartition, offset) >>>>> >> } >>>>> >> //} >>>>> >> Option(offsets.mkString(",")) >>>>> >> } >>>>> >> >>>>> >> // Read the previously saved offsets from Zookeeper >>>>> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = { >>>>> >> >>>>> >> LogHandler.log.info("Reading offsets from ZooKeeper") >>>>> >> >>>>> >> val offsetsRangesStrOpt = getOffsets(consumerGrp,topic) >>>>> >> val start = System.currentTimeMillis() >>>>> >> offsetsRangesStrOpt match { >>>>> >> case Some(offsetsRangesStr) => >>>>> >> LogHandler.log.debug(s"Read offset ranges: >>>>> >> ${offsetsRangesStr}") >>>>> >> >>>>> >> val offsets = offsetsRangesStr.split(",") >>>>> >> .map(s => s.split(":")) >>>>> >> .map { case Array(partitionStr, offsetStr) => >>>>> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) } >>>>> >> .toMap >>>>> >> >>>>> >> LogHandler.log.info("Done reading offsets from ZooKeeper. Took >>>>> >> " + >>>>> >> (System.currentTimeMillis() - start)) >>>>> >> >>>>> >> Some(offsets) >>>>> >> case None => >>>>> >> LogHandler.log.info("No offsets found in ZooKeeper. Took " + >>>>> >> (System.currentTimeMillis() - start)) >>>>> >> None >>>>> >> } >>>>> >> >>>>> >> } >>>>> >> >>>>> >> However, I am concerned if the saveOffsets will work well with this >>>>> >> approach. Thats when I realized we are not considering brokerIds >>>>> >> which >>>>> >> storing offsets and probably the OffsetRanges does not have it >>>>> >> either. It >>>>> >> can only provide Topic, partition, from and until offsets. >>>>> >> >>>>> >> I am probably missing something very basic. Probably the library >>>>> >> works >>>>> >> well by itself. Can someone/ Cody explain? >>>>> >> >>>>> >> Cody, Thanks a lot for sharing your work. >>>>> >> >>>>> >> regards >>>>> >> Sunita >>>>> >> >>>>> >> >>>>> >> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger >>>>> >> <c...@koeninger.org> >>>>> >> wrote: >>>>> >>> >>>>> >>> See >>>>> >>> https://github.com/koeninger/kafka-exactly-once >>>>> >>> >>>>> >>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" >>>>> >>> <mdkhajaasm...@gmail.com> wrote: >>>>> >>>> >>>>> >>>> Hi Experts, >>>>> >>>> >>>>> >>>> I am looking for some information on how to acheive zero data loss >>>>> >>>> while >>>>> >>>> working with kafka and Spark. I have searched online and blogs >>>>> >>>> have >>>>> >>>> different answer. Please let me know if anyone has idea on this. >>>>> >>>> >>>>> >>>> Blog 1: >>>>> >>>> >>>>> >>>> >>>>> >>>> https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html >>>>> >>>> >>>>> >>>> >>>>> >>>> Blog2: >>>>> >>>> >>>>> >>>> >>>>> >>>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html >>>>> >>>> >>>>> >>>> >>>>> >>>> Blog one simply says configuration change with checkpoint >>>>> >>>> directory and >>>>> >>>> blog 2 give details about on how to save offsets to zoo keeper. >>>>> >>>> can you >>>>> >>>> please help me out with right approach. >>>>> >>>> >>>>> >>>> Thanks, >>>>> >>>> Asmath >>>>> >>>> >>>>> >>>> >>>>> >> >>>>> > >>>> >>>> >>> >> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org