package com.companyx.ep.poc.spark.reporting.process.detail.viewitem.provider

import com.companyx.ep.poc.spark.reporting.process.annotations.Ignore
import com.companyx.ep.poc.spark.reporting.process.detail.model.Data
import com.companyx.ep.poc.spark.reporting.process.detail.model.DetailInputRecord
import com.companyx.ep.poc.spark.reporting.process.detail.success.provider.DetailDataProvider

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

import java.util.Date

import com.companyx.ep.poc.spark.reporting.process.util.DataUtil
import com.companyx.ep.poc.spark.reporting.process.util.DateUtil

class VISummary extends Data {
  var leafCategoryId: Int = _
  var itemSiteId: Int = _
  var auctionTypeCode: Int = _
  var buyerCountryId: Int = _
  var sellerCountryId: Int = _
  var buyerSegment: String = _
  var powerSellerLevel: String = _
  var sellerStdLevel: String = _
  var isBin: Int = _
  @Ignore
  var itemId: Long = _
}

class VISummaryDataProvider extends DetailDataProvider[VISummary] {

  override def getData(details: RDD[DetailInputRecord], startDate: Date, sc: SparkContext) = {
    //create Listings object from day minus 91 or start date minus 89 Days
    val listings = DataUtil.getDwLstgItem(sc, DateUtil.addDaysToDate(startDate, -89))
    val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }

    val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg) }.collectAsMap
    val broadCastMap = sc.broadcast(lstgItemMap)
    val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = viEvents.mapPartitions({
      iter =>
        val lstgItemMap = broadCastMap.value
        for {
          (itemId, viDetail) <- iter
          if (lstgItemMap.contains(itemId))
        } yield ({
          val listing = lstgItemMap.get(itemId).get
          val viSummary = new VISummary
          viSummary.leafCategoryId = listing.getLeafCategId().toInt
          viSummary.itemSiteId = listing.getItemSiteId().toInt
          viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
          viSummary.sellerCountryId = listing.getSlrCntryId().toInt
          viSummary.buyerSegment = "0"
          viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)

          val sellerId = listing.getSlrId.toLong
          (sellerId, (viDetail, viSummary, itemId))
        })
    })

    val spsLevelMetricSum = DataUtil.getSpsLevelMetricSum(sc, startDate)
    val spsLvlMetric = spsLevelMetricSum.map { sps => (sps.getUserId.toLong, sps) }
    val viEventsWithListingsJoinSpsLevelMetric = viEventsWithListings.leftOuterJoin(spsLvlMetric).map {
      case (viJoinSpsLevelMetric) => {
        val (sellerId, ((viEventDetail, viSummary, itemId), spsLvlMetric)) = viJoinSpsLevelMetric
        val sessionStartDt = DateUtil.formatStringAsDate(viEventDetail.get(2).asInstanceOf[String])
        if (spsLvlMetric.isDefined) {
          if (Option(spsLvlMetric.get.getSpsSlrLevelSumStartDt).isDefined && Option(spsLvlMetric.get.getSpsSlrLevelSumStartDt).isDefined) {
            val spsSlrLevelSumStartDt = DateUtil.formatStringAsDate(spsLvlMetric.get.getSpsSlrLevelSumStartDt)
            val spsSlrLevelSumEndDt = DateUtil.formatStringAsDate(spsLvlMetric.get.getSpsSlrLevelSumEndDt)
            if (sellerId != 0
              && ((sessionStartDt.after(spsSlrLevelSumStartDt) || sessionStartDt.equals(spsSlrLevelSumStartDt))
                && (sessionStartDt.before(spsSlrLevelSumEndDt) || sessionStartDt.equals(spsSlrLevelSumEndDt)))) {
              val itemSiteId = viSummary.itemSiteId
              val programId = itemSiteId match {
                case 0 => 2
                case 100 => 2
                case 3 => 3
                case 77 => 4
                case _ => 1
              }

              val spsPrgrmId = spsLvlMetric.get.getSpsPrgrmId.toInt
              val spsSlrLevelCd = spsLvlMetric.get.getSpsSlrLevelCd.toInt
              if (programId == spsPrgrmId) {
                val sellerStdLevel = spsSlrLevelCd match {
                  case 1 => "eTRS"
                  case 2 => "Above Standard"
                  case 3 => "Standard"
                  case 4 => "Below Standard"
                  case _ => "Other"
                }
                viSummary.sellerStdLevel = sellerStdLevel
              }
              val currPsLvlId = spsLvlMetric.get.getCurrPsLvlId.trim()
              val currPsLvlId1 = spsLvlMetric.get.getCurrPsLvlId
              val powerSellerLevel = currPsLvlId match {
                case "11" => "Bronze"
                case "22" => "Silver"
                case "33" => "Gold"
                case "66" => "Platinum"
                case "77" => "Titanium"
                case _ => "Not a Power Seller"
              }
              viSummary.powerSellerLevel = powerSellerLevel
            }
          }
        }
        ((viEventDetail.get(0), viEventDetail.get(1).asInstanceOf[Long], viEventDetail.get(2), viEventDetail.get(8).asInstanceOf[Int]), (viEventDetail, viSummary, itemId))
      }
    }

    val powerSellerLevel = viEventsWithListingsJoinSpsLevelMetric.reduceByKey {
      case (v1, v2) =>
        val v1Summary = v1._2
        val v2Summary = v2._2

        val vi1PowerSeller: String = Option(v1Summary.powerSellerLevel).getOrElse("")
        val vi2PowerSeller: String = Option(v2Summary.powerSellerLevel).getOrElse("")
        if (vi1PowerSeller.compareTo(vi2PowerSeller) > 0) {
          v1Summary.powerSellerLevel = vi1PowerSeller
        } else {
          v1Summary.powerSellerLevel = vi2PowerSeller
        }

        val vi1SellerStdLevel: String = Option(v1Summary.sellerStdLevel).getOrElse("")
        val vi2SellerStdLevel: String = Option(v2Summary.sellerStdLevel).getOrElse("")
        if (vi1SellerStdLevel.compareTo(vi2SellerStdLevel) > 0) {
          v1Summary.sellerStdLevel = vi1SellerStdLevel
        } else {
          v1Summary.sellerStdLevel = vi2SellerStdLevel
        }
        v1Summary.itemId = v1._3
        (v1)
    }.map {
      case (k, v) => {
        (v._1, v._2)
      }
    }
    powerSellerLevel
  }
}