Honestly, I would stay far away from saving offsets in Zookeeper if at
all possible. It's better to store them alongside your results.

On Wed, Oct 26, 2016 at 10:44 AM, Sunita Arvind <sunitarv...@gmail.com> wrote:
> This is enough to get it to work:
>
> df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet",
> SaveMode.Overwrite)
>
> And tests so far (in local env) seem good with the edits. Yet to test on the
> cluster. Cody, appreciate your thoughts on the edits.
>
> Just want to make sure I am not doing an overkill or overseeing a potential
> issue.
>
> regards
>
> Sunita
>
>
> On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>>
>> The error in the file I just shared is here:
>>
>> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
>> partition._2(0);  --> this was just partition and hence there was an error
>>
>> fetching the offset.
>>
>> Still testing. Somehow Cody, your code never lead to file already exists
>> sort of errors (I am saving the output of the dstream
>> as parquet file, after converting it to a dataframe. The batch interval
>> will be 2 hrs)
>>
>> The code in the main is here:
>>
>>   val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>> conf.getString("groupId"), conf.getString("topics"))
>>    val storedOffsets = offsetsStore.readOffsets()
>>  LogHandler.log.info("Fetched the offset from zookeeper")
>>
>>  val kafkaArr =  storedOffsets match {
>>    case None =>
>>      // start from the initial offsets
>>
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>> kafkaProps, Set(topics))
>>
>>    case Some(fromOffsets) =>
>>      // start from previously saved offsets
>>      val messageHandler: MessageAndMetadata[String, Array[Byte]] =>
>> (String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) =>
>> (mmd.key, mmd.message)
>>
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
>> Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)
>>
>>      //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage,
>> (String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
>>  }
>>
>>  kafkaArr.foreachRDD{ (rdd,time) =>
>>
>>         val schema =
>> SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
>>         val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r =>
>> Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
>>         }
>>         val df = sql.createDataFrame(ardd,schema)
>>    LogHandler.log.info("Created dataframe")
>>    val offsetSaved =
>> offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
>>    LogHandler.log.info("Saved offset to Zookeeper")
>>    df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
>>    LogHandler.log.info("Created the parquet file")
>>  }
>>
>> Thanks
>>
>> Sunita
>>
>>
>>
>>
>>
>> On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind <sunitarv...@gmail.com>
>> wrote:
>>>
>>> Attached is the edited code. Am I heading in right direction? Also, I am
>>> missing something due to which, it seems to work well as long as the
>>> application is running and the files are created right. But as soon as I
>>> restart the application, it goes back to fromOffset as 0. Any thoughts?
>>>
>>> regards
>>> Sunita
>>>
>>> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com>
>>> wrote:
>>>>
>>>> Thanks for confirming Cody.
>>>> To get to use the library, I had to do:
>>>>
>>>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>>>> "/consumers/topics/"+ topics + "/0")
>>>>
>>>> It worked well. However, I had to specify the partitionId in the zkPath.
>>>> If I want the library to pick all the partitions for a topic, without me
>>>> specifying the path, is it possible out of the box or I need to tweak?
>>>>
>>>> regards
>>>> Sunita
>>>>
>>>>
>>>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>>
>>>>> You are correct that you shouldn't have to worry about broker id.
>>>>>
>>>>> I'm honestly not sure specifically what else you are asking at this
>>>>> point.
>>>>>
>>>>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com>
>>>>> wrote:
>>>>> > Just re-read the kafka architecture. Something that slipped my mind
>>>>> > is, it
>>>>> > is leader based. So topic/partitionId pair will be same on all the
>>>>> > brokers.
>>>>> > So we do not need to consider brokerid while storing offsets. Still
>>>>> > exploring rest of the items.
>>>>> > regards
>>>>> > Sunita
>>>>> >
>>>>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind
>>>>> > <sunitarv...@gmail.com>
>>>>> > wrote:
>>>>> >>
>>>>> >> Hello Experts,
>>>>> >>
>>>>> >> I am trying to use the saving to ZK design. Just saw Sudhir's
>>>>> >> comments
>>>>> >> that it is old approach. Any reasons for that? Any issues observed
>>>>> >> with
>>>>> >> saving to ZK. The way we are planning to use it is:
>>>>> >> 1. Following
>>>>> >>
>>>>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
>>>>> >> 2. Saving to the same file with offsetRange as a part of the file.
>>>>> >> We hope
>>>>> >> that there are no partial writes/ overwriting is possible and
>>>>> >> offsetRanges
>>>>> >>
>>>>> >> However I have below doubts which I couldnt figure out from the code
>>>>> >> here
>>>>> >> -
>>>>> >>
>>>>> >> https://github.com/ippontech/spark-kafka-source/blob/master/src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>>>>> >> 1. The brokerId is not part of the OffsetRange. How will just the
>>>>> >> partitionId:FromOffset stay unique in a cluster with multiple
>>>>> >> brokers and
>>>>> >> multiple partitions/topic.
>>>>> >> 2. Do we have to specify zkPath to include the partitionid. I tried
>>>>> >> using
>>>>> >> the ZookeeperOffsetStore as is and it required me to specify the
>>>>> >> partitionId:
>>>>> >>
>>>>> >> val offsetsStore = new
>>>>> >> ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>>>>> >> "/consumers/topics/"+ topics + "/0")
>>>>> >>
>>>>> >> For our usecases it is too limiting to include partitionId in the
>>>>> >> path.
>>>>> >> To get it to work by automatically detecting the existing partitions
>>>>> >> for a
>>>>> >> given topic, I changed it as below (inspired from
>>>>> >>
>>>>> >> http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):
>>>>> >>
>>>>> >> /**
>>>>> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>>>>> >>   * groupID consumer group to get offsets for
>>>>> >>   * topic topic to get offsets for
>>>>> >>   * return - mapping of (topic and) partition to offset
>>>>> >>   */
>>>>> >> private def getOffsets(groupID :String, topic:
>>>>> >> String):Option[String] = {
>>>>> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>>>>> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>>>>> >>   val topicSeq = List(topic).toSeq
>>>>> >>  // try {
>>>>> >>     val partitions = ZkUtils.getPartitionsForTopics(zkClient,
>>>>> >> topicSeq)
>>>>> >>     var partition:Object=null
>>>>> >>     for (partition <- partitions) {
>>>>> >>       val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
>>>>> >> "/" +
>>>>> >> partition;
>>>>> >>       val maybeOffset:Option[String] =
>>>>> >> ZkUtils.readDataMaybeNull(zkClient,
>>>>> >> partitionOffsetPath)._1;
>>>>> >>       val offset:Long = if(maybeOffset.isDefined)
>>>>> >> maybeOffset.get.toLong
>>>>> >> else 0L;
>>>>> >>       val topicAndPartition:TopicAndPartition  = new
>>>>> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
>>>>> >>       offsets.put(topicAndPartition, offset)
>>>>> >>     }
>>>>> >>   //}
>>>>> >> Option(offsets.mkString(","))
>>>>> >> }
>>>>> >>
>>>>> >> // Read the previously saved offsets from Zookeeper
>>>>> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>>>>> >>
>>>>> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
>>>>> >>
>>>>> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>>>>> >>   val start = System.currentTimeMillis()
>>>>> >>   offsetsRangesStrOpt match {
>>>>> >>     case Some(offsetsRangesStr) =>
>>>>> >>       LogHandler.log.debug(s"Read offset ranges:
>>>>> >> ${offsetsRangesStr}")
>>>>> >>
>>>>> >>       val offsets = offsetsRangesStr.split(",")
>>>>> >>         .map(s => s.split(":"))
>>>>> >>         .map { case Array(partitionStr, offsetStr) =>
>>>>> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
>>>>> >>         .toMap
>>>>> >>
>>>>> >>       LogHandler.log.info("Done reading offsets from ZooKeeper. Took
>>>>> >> " +
>>>>> >> (System.currentTimeMillis() - start))
>>>>> >>
>>>>> >>       Some(offsets)
>>>>> >>     case None =>
>>>>> >>       LogHandler.log.info("No offsets found in ZooKeeper. Took " +
>>>>> >> (System.currentTimeMillis() - start))
>>>>> >>       None
>>>>> >>   }
>>>>> >>
>>>>> >> }
>>>>> >>
>>>>> >> However, I am concerned if the saveOffsets will work well with this
>>>>> >> approach. Thats when I realized we are not considering brokerIds
>>>>> >> which
>>>>> >> storing offsets and probably the OffsetRanges does not have it
>>>>> >> either. It
>>>>> >> can only provide Topic, partition, from and until offsets.
>>>>> >>
>>>>> >> I am probably missing something very basic. Probably the library
>>>>> >> works
>>>>> >> well by itself. Can someone/ Cody explain?
>>>>> >>
>>>>> >> Cody, Thanks a lot for sharing your work.
>>>>> >>
>>>>> >> regards
>>>>> >> Sunita
>>>>> >>
>>>>> >>
>>>>> >> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger
>>>>> >> <c...@koeninger.org>
>>>>> >> wrote:
>>>>> >>>
>>>>> >>> See
>>>>> >>> https://github.com/koeninger/kafka-exactly-once
>>>>> >>>
>>>>> >>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed"
>>>>> >>> <mdkhajaasm...@gmail.com> wrote:
>>>>> >>>>
>>>>> >>>> Hi Experts,
>>>>> >>>>
>>>>> >>>> I am looking for some information on how to acheive zero data loss
>>>>> >>>> while
>>>>> >>>> working with kafka and Spark. I have searched online and blogs
>>>>> >>>> have
>>>>> >>>> different answer. Please let me know if anyone has idea on this.
>>>>> >>>>
>>>>> >>>> Blog 1:
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> Blog2:
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> Blog one simply says configuration change with checkpoint
>>>>> >>>> directory and
>>>>> >>>> blog 2 give details about on how to save offsets to zoo keeper.
>>>>> >>>> can you
>>>>> >>>> please help me out with right approach.
>>>>> >>>>
>>>>> >>>> Thanks,
>>>>> >>>> Asmath
>>>>> >>>>
>>>>> >>>>
>>>>> >>
>>>>> >
>>>>
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to