package com.lightbend.broker

import kafka.cluster.{ BrokerEndPoint, EndPoint }
import kafka.common.{ BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException }
import kafka.utils.{ Json, ZkUtils }
import org.apache.kafka.common.Node
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.SecurityProtocol

import scala.collection.mutable.ListBuffer

// Extension of kafka.cluster.Broker class

object BrokerWithJMX {

  private val BrokerIdsPath = "/brokers/ids"

  private val HostKey = "host"
  private val PortKey = "port"
  private val VersionKey = "version"
  private val EndpointsKey = "endpoints"
  private val RackKey = "rack"
  private val JmxPortKey = "jmx_port"
  private val ListenerSecurityProtocolMapKey = "listener_security_protocol_map"
  private val TimestampKey = "timestamp"

  /**
   * Create a broker object from id and JSON string.
   *
   * @param id
   * @param brokerInfoString
   *
   * Version 1 JSON schema for a broker is:
   * {
   *   "version":1,
   *   "host":"localhost",
   *   "port":9092
   *   "jmx_port":9999,
   *   "timestamp":"2233345666"
   * }
   *
   * Version 2 JSON schema for a broker is:
   * {
   *   "version":2,
   *   "host":"localhost",
   *   "port":9092
   *   "jmx_port":9999,
   *   "timestamp":"2233345666",
   *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
   * }
   *
   * Version 3 JSON schema for a broker is:
   * {
   *   "version":3,
   *   "host":"localhost",
   *   "port":9092
   *   "jmx_port":9999,
   *   "timestamp":"2233345666",
   *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
   *   "rack":"dc1"
   * }
   *
   * Version 4 (current) JSON schema for a broker is:
   * {
   *   "version":4,
   *   "host":"localhost",
   *   "port":9092
   *   "jmx_port":9999,
   *   "timestamp":"2233345666",
   *   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
   *   "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}
   *   "rack":"dc1"
   * }
   */
  def createBroker(id: Int, brokerInfoString: String): BrokerWithJMX = {
    if (brokerInfoString == null)
      throw new BrokerNotAvailableException(s"Broker id $id does not exist")
    try {
      Json.parseFull(brokerInfoString) match {
        case Some(m) =>
          val brokerInfo = m.asInstanceOf[Map[String, Any]]
          val version = brokerInfo(VersionKey).asInstanceOf[Int]
          val endpoints =
            if (version < 1)
              throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString")
            else if (version == 1) {
              val host = brokerInfo(HostKey).asInstanceOf[String]
              val port = brokerInfo(PortKey).asInstanceOf[Int]
              val securityProtocol = SecurityProtocol.PLAINTEXT
              val endPoint = new EndPoint(host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
              Seq(endPoint)
            } else {
              val securityProtocolMap = brokerInfo.get(ListenerSecurityProtocolMapKey).map(
                _.asInstanceOf[Map[String, String]].map {
                  case (listenerName, securityProtocol) =>
                    new ListenerName(listenerName) -> SecurityProtocol.forName(securityProtocol)
                }
              )
              val listeners = brokerInfo(EndpointsKey).asInstanceOf[List[String]]
              listeners.map(EndPoint.createEndPoint(_, securityProtocolMap))
            }
          val rack = brokerInfo.get(RackKey).filter(_ != null).map(_.asInstanceOf[String])
          val jmxPort = brokerInfo.get(JmxPortKey).getOrElse(-1)

          BrokerWithJMX(id, endpoints, rack, jmxPort.asInstanceOf[Int], brokerInfo(HostKey).asInstanceOf[String])
        case None =>
          throw new BrokerNotAvailableException(s"Broker id $id does not exist")
      }
    } catch {
      case t: Throwable =>
        throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
    }
  }

  def getBrokers(zkUtils: ZkUtils, prefix: Option[String] = None): Seq[BrokerWithJMX] = {
    var brokers = new ListBuffer[BrokerWithJMX]()
    val path = prefix match {
      case Some(p) => s"/$p$BrokerIdsPath"
      case _ => BrokerIdsPath
    }
    val nodes = zkUtils.getChildrenParentMayNotExist(path)
    for (node <- nodes) {
      val brokerZKString = zkUtils.readData(path + "/" + node)._1.asInstanceOf[String]
      brokers += createBroker(node.toInt, brokerZKString)
    }
    brokers
  }
}

case class BrokerWithJMX(id: Int, endPoints: Seq[EndPoint], rack: Option[String], jmxPort: Int, host: String) {

  private val endPointsMap = endPoints.map { endPoint =>
    endPoint.listenerName -> endPoint
  }.toMap

  if (endPointsMap.size != endPoints.size)
    throw new IllegalArgumentException(s"There is more than one end point with the same listener name: ${endPoints.mkString(",")}")

  override def toString: String =
    s"$id : ${endPointsMap.values.mkString("(", ",", ")")} : ${rack.orNull}"

  def this(id: Int, host: String, port: Int, listenerName: ListenerName, protocol: SecurityProtocol, jmxPort: Int) = {
    this(id, Seq(EndPoint(host, port, listenerName, protocol)), None, jmxPort, host)
  }

  def this(bep: BrokerEndPoint, listenerName: ListenerName, protocol: SecurityProtocol, jmxPort: Int) = {
    this(bep.id, bep.host, bep.port, listenerName, protocol, jmxPort)
  }

  def getNode(listenerName: ListenerName): Node = {
    val endpoint = endPointsMap.getOrElse(
      listenerName,
      throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found for broker $id")
    )
    new Node(id, endpoint.host, endpoint.port, rack.orNull)
  }

  def getBrokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
    val endpoint = endPointsMap.getOrElse(
      listenerName,
      throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found for broker $id")
    )
    new BrokerEndPoint(id, endpoint.host, endpoint.port)
  }

  override def equals(obj: scala.Any): Boolean = {
    obj match {
      case b: BrokerWithJMX => (b.jmxPort == jmxPort) && (b.host.equalsIgnoreCase(host))
      case _ => false
    }
  }
}