Hi guys,
I built a tool in my kafka tools repository that gets a topic a zookeepr and file, and checks for each topic/partition if the topic is "DRY".

It basically writes to the file the current offsets in each broker, if the combination of the (topic,broker,partition) existed in the file prior to the run, it will compare the current offset of the (topic,broker,partition) with the in the file (previous), if it didn't got larger it will return the status stuck for this (topic,broker,partition), otherwise (1. there was no file 2. the (topic,broker,partition) was not reported 3. the value of the previous (topic,broker,partition) is lesser than the current) it returns the status OK

This output can be processed by a monitoring tool such as Nagios,

Thanks

On 07/11/2012 09:21 AM, Guy Doulberg wrote:
I will do so,
After I will do so, I will publish this code for anyone use


On 07/10/2012 06:36 PM, Jun Rao wrote:
You can potentially build alerting using our tool ConsumerOffsetChecker.
See http://incubator.apache.org/kafka/faq.html on how to use the tool.

Thanks,

Jun

On Tue, Jul 10, 2012 at 8:21 AM, Guy Doulberg <guy.doulb...@conduit.com>wrote:

Hi guys,

How do you monitor a topic, and alert if it has no messages for some time?

I would like to monitor a topic , and  report   to Nagios so I could get
an alert when something is wrong.


What do you think?






/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.tools


import joptsimple._
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
import kafka.consumer.SimpleConsumer
import collection.mutable.Map
import java.io.FileWriter
import java.io.File
import scala.io.Source
  
// It basically writes to a file the current offsets in each broker, 
//if the combination of the (topic,broker,partition) existed in the 
//file prior to the run, it will compare the current offset of the (topic,broker,partition)
// with the in the file (previous), if it didn't got larger it will return the status stuck 
//for this (topic,broker,partition), otherwise 
//(1. there was no file 2. the (topic,broker,partition) was not reported 
//3. the value of the previous (topic,broker,partition) is lesser than the current) it returns the status OK
//
//



object TopicOffsetChecker extends Logging {

  private val consumerMap: Map[String, Option[SimpleConsumer]] = Map()

  private val BidPidPattern = """(\d+)-(\d+)""".r

  private val BrokerIpPattern = """.*:(\d+\.\d+\.\d+\.\d+):(\d+$)""".r
  
  private val previousOffsets: Map[String, Map[String,String]] = Map()
 
  private var moved = false 

  private var filename = ""
  

  var fileWriter : FileWriter  = null 
  
  // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092

  private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
    val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))
    val consumer = brokerInfo match {
      case BrokerIpPattern(ip, port) =>
        Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
      case _ =>
        error("Could not parse broker info %s".format(brokerInfo))
        None
    }
    consumer
  }

 private def processBroker(zkClient: ZkClient,topic: String, bidPid: String ){
    bidPid match {
      case BidPidPattern(bid, pid) =>
        val consumerOpt = consumerMap.getOrElseUpdate(
          bid, getConsumer(zkClient, bid))
        consumerOpt match {
          case Some(consumer) =>
            val logSize =
              consumer.getOffsetsBefore(topic, pid.toInt, -1, 1).last.toLong
              val prevSize =  
		if (previousOffsets.contains(topic) && (previousOffsets(topic).contains(bidPid))) 
			previousOffsets(topic)(bidPid).toLong else 0 
	      val ok =  prevSize < logSize
	      val statusMsg = if (ok) "OK" else "STUCK"
	      moved = moved&ok
              println("%s:%s:%s".format(topic,bidPid,statusMsg))
              fileWriter.write("%s:%s:%s\n".format(topic,bidPid,logSize))
          case None => // ignore
        }
      case _ =>
        error("Could not parse broker/partition pair %s".format(bidPid))
    }
 
 }

  private def processTopic(zkClient: ZkClient, topic: String) {
     
     val topics = List(topic)
     val bidsPids = ZkUtils.getPartitionsForTopics(zkClient,topics.iterator)
                
     bidsPids(topic).distinct.sorted.foreach {
            bidPid => processBroker(zkClient,topic,bidPid)
     }
  }


  def main(args: Array[String]) {
    val parser = new OptionParser()

    val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.").
            withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]);
    val topicsOpt = parser.accepts("topic",
            "Comma-separated list of consumer topics (all topics if absent).").
            withRequiredArg().ofType(classOf[String])
    val fileOpt = parser.accepts("file", "file name to server as presistence of the state of the topics").
            withRequiredArg().ofType(classOf[String])
    parser.accepts("help", "Print this message.")

    val options = parser.parse(args : _*)

    if (options.has("help")) {
       parser.printHelpOn(System.out)
       System.exit(0)
    }

    for (opt <- List(fileOpt,topicsOpt))
      if (!options.has(opt)) {
        System.err.println("Missing required argument: %s".format(opt))
        parser.printHelpOn(System.err)
        System.exit(1)
      }

    val zkConnect = options.valueOf(zkConnectOpt)
    filename = options.valueOf(fileOpt)
    val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt))
      else None

     val file = new File(filename)
      if ( file.exists() ){
      	for(line <- Source.fromFile(file).getLines()){
           val parts =  line.split(":")
          
           if (!previousOffsets.contains(parts(0))){
                previousOffsets(parts(0)) = Map()
           }
	   previousOffsets(parts(0))(parts(1)) = parts(2)
      	}
      }
    println("Loaded data from :")
    var zkClient: ZkClient = null
    try {
     
      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
      fileWriter  =  new FileWriter(filename)
      val topicList = topics match {
        case Some(x) => x.split(",").view.toList
      }
      debug("zkConnect = %s; topics = %s; ".format(
        zkConnect, topicList.toString()))

      topicList.sorted.foreach {
        topic => processTopic(zkClient, topic)
      }
    }
    catch {
       case e: Exception => e.printStackTrace()
    }
    finally {
   fileWriter.flush()
fileWriter.close();
      for (consumerOpt <- consumerMap.values) {
        consumerOpt match {
          case Some(consumer) => consumer.close()
          case None => // ignore
        }
      }
      if (zkClient != null)
        zkClient.close()
    }
  }
}

Reply via email to